This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 764f09c  SAMZA-2589: Consolidate Beam and High/Low Samza Apps launch 
workflow (#1428)
764f09c is described below

commit 764f09c6239cdec961fe4f52ae97b8f374db107d
Author: Ke Wu <k...@linkedin.com>
AuthorDate: Fri Sep 11 15:33:50 2020 -0700

    SAMZA-2589: Consolidate Beam and High/Low Samza Apps launch workflow (#1428)
    
    Issues: app.main.class is only set for Beam apps which causes different 
workflow on AM when launching a job.
    Changes:
    1. Introduce DefaultApplicationMain to capture launch workflow for High/Low 
level jobs so on AM, all jobs are launched in the same way: 
ClusterBasedJobCoordinatorRunner#main -> app.main.class -> 
JobCoordinatorLaunchUtil
    2. Update ApplicationConfig#getAppMainClass to default to 
DefaultApplicationMain
    
    Tests:
    1. Unit Tests
    2. Deployed hello samza job successfully with the change following 
instructions on http://samza.apache.org/startup/hello-samza/latest/
    
    API Changes: None
    Upgrade Instructions: None
    Usage Instructions: None
---
 gradle/dependency-versions.gradle                  |  2 +-
 .../ClusterBasedJobCoordinatorRunner.java          | 38 ++++-------
 .../clustermanager/DefaultApplicationMain.java     | 47 ++++++++++++++
 .../org/apache/samza/config/ApplicationConfig.java | 10 +--
 .../clustermanager/TestDefaultApplicationMain.java | 74 ++++++++++++++++++++++
 5 files changed, 136 insertions(+), 35 deletions(-)

diff --git a/gradle/dependency-versions.gradle 
b/gradle/dependency-versions.gradle
index 061f0b7..28acb2a 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -36,7 +36,7 @@
   jerseyVersion = "2.22.1"
   jettyVersion = "9.4.20.v20190813"
   jodaTimeVersion = "2.2"
-  joptSimpleVersion = "3.2"
+  joptSimpleVersion = "5.0.4"
   junitVersion = "4.12"
   kafkaVersion = "2.0.1"
   log4jVersion = "1.2.17"
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
index 107c473..a152032 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
@@ -26,17 +26,14 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.ApplicationUtil;
 import org.apache.samza.classloader.IsolatingClassLoaderFactory;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
-import org.apache.samza.util.ConfigUtil;
 import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.SplitDeploymentUtil;
 import org.slf4j.Logger;
@@ -94,30 +91,16 @@ public class ClusterBasedJobCoordinatorRunner {
        * For Beam jobs, app.main.class will be Beam's main class
        * and app.main.args will be Beam's pipeline options.
        */
-      if (appConfig.getAppMainClass().isPresent()) {
-        String className = appConfig.getAppMainClass().get();
-        LOG.info("Invoke main {}", className);
-        try {
-          Class<?> cls = Class.forName(className);
-          Method mainMethod = cls.getMethod("main", String[].class);
-          mainMethod.invoke(null, (Object) toArgs(appConfig));
-        } catch (Exception e) {
-          throw new SamzaException(e);
-        }
-      } else {
-        JobConfig jobConfig = new JobConfig(submissionConfig);
-
-        if (!jobConfig.getConfigLoaderFactory().isPresent()) {
-          throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is 
required to initialize job coordinator from config loader");
-        }
-
-        // load full job config with ConfigLoader
-        Config originalConfig = ConfigUtil.loadConfig(submissionConfig);
-
-        
JobCoordinatorLaunchUtil.run(ApplicationUtil.fromConfig(originalConfig), 
originalConfig);
+      String className = appConfig.getAppMainClass();
+      String[] arguments = toArgs(appConfig);
+      LOG.info("Invoke main {} with args {}", className, arguments);
+      try {
+        Class<?> cls = Class.forName(className);
+        Method mainMethod = cls.getMethod("main", String[].class);
+        mainMethod.invoke(null, (Object) arguments);
+      } catch (Exception e) {
+        throw new SamzaException(e);
       }
-
-      LOG.info("Finished running ClusterBasedJobCoordinator");
     } else {
       // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is 
removed.
       Config coordinatorSystemConfig;
@@ -133,8 +116,9 @@ public class ClusterBasedJobCoordinatorRunner {
       }
       ClusterBasedJobCoordinator jc = 
createFromMetadataStore(coordinatorSystemConfig);
       jc.run();
-      LOG.info("Finished running ClusterBasedJobCoordinator");
     }
+
+    LOG.info("Finished running ClusterBasedJobCoordinator");
   }
 
   /**
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/DefaultApplicationMain.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/DefaultApplicationMain.java
new file mode 100644
index 0000000..9611bf9
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/DefaultApplicationMain.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import joptsimple.OptionSet;
+import org.apache.samza.application.ApplicationUtil;
+import org.apache.samza.config.Config;
+import org.apache.samza.runtime.ApplicationRunnerMain;
+import org.apache.samza.util.ConfigUtil;
+
+
+public class DefaultApplicationMain {
+
+  public static void main(String[] args) {
+    run(args);
+  }
+
+  @VisibleForTesting
+  static void run(String[] args) {
+    // This branch is ONLY for Yarn deployments, standalone apps uses offspring
+    final ApplicationRunnerMain.ApplicationRunnerCommandLine cmdLine = new 
ApplicationRunnerMain.ApplicationRunnerCommandLine();
+    cmdLine.parser().allowsUnrecognizedOptions();
+
+    final OptionSet options = cmdLine.parser().parse(args);
+    // load full job config with ConfigLoader
+    final Config originalConfig = 
ConfigUtil.loadConfig(cmdLine.loadConfig(options));
+
+    JobCoordinatorLaunchUtil.run(ApplicationUtil.fromConfig(originalConfig), 
originalConfig);
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
index ea2e943..3dabd05 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -18,7 +18,7 @@
  */
 package org.apache.samza.config;
 
-import java.util.Optional;
+import org.apache.samza.clustermanager.DefaultApplicationMain;
 import org.apache.samza.runtime.UUIDGenerator;
 
 
@@ -103,12 +103,8 @@ public class ApplicationConfig extends MapConfig {
     return ApplicationMode.valueOf(get(APP_MODE, 
ApplicationMode.STREAM.name()).toUpperCase());
   }
 
-  public Optional<String> getAppMainArgs() {
-    return Optional.ofNullable(get(APP_MAIN_ARGS));
-  }
-
-  public Optional<String> getAppMainClass() {
-    return Optional.ofNullable(get(APP_MAIN_CLASS));
+  public String getAppMainClass() {
+    return get(APP_MAIN_CLASS, DefaultApplicationMain.class.getName());
   }
 
   public String getAppRunnerClass() {
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestDefaultApplicationMain.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestDefaultApplicationMain.java
new file mode 100644
index 0000000..57fbed8
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestDefaultApplicationMain.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.application.ApplicationUtil;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
+import org.apache.samza.util.ConfigUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.verifyStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    ApplicationUtil.class,
+    ConfigUtil.class,
+    JobCoordinatorLaunchUtil.class,
+    ClusterBasedJobCoordinatorRunner.class})
+public class TestDefaultApplicationMain {
+
+  @Test
+  public void testRun() throws Exception {
+    String[] args = new String[] {
+        "--config",
+        JobConfig.CONFIG_LOADER_FACTORY + "=" + 
PropertiesConfigLoaderFactory.class.getName(),
+        "--config",
+        PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + 
"path=" + getClass().getResource("/test.properties").getPath()
+    };
+
+    StreamApplication mockApplication = mock(StreamApplication.class);
+    Config mockConfig = mock(Config.class);
+    mockStatic(JobCoordinatorLaunchUtil.class, ApplicationUtil.class, 
ConfigUtil.class);
+
+    when(ApplicationUtil.fromConfig(any()))
+        .thenReturn(mockApplication);
+    when(ConfigUtil.loadConfig(any()))
+        .thenReturn(mockConfig);
+    doNothing()
+        .when(JobCoordinatorLaunchUtil.class, "run",
+            mockApplication, mockConfig);
+    DefaultApplicationMain.run(args);
+
+    verifyStatic(times(1));
+    JobCoordinatorLaunchUtil.run(mockApplication, mockConfig);
+  }
+}

Reply via email to