This is an automated email from the ASF dual-hosted git repository. bharathkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 764f09c SAMZA-2589: Consolidate Beam and High/Low Samza Apps launch workflow (#1428) 764f09c is described below commit 764f09c6239cdec961fe4f52ae97b8f374db107d Author: Ke Wu <k...@linkedin.com> AuthorDate: Fri Sep 11 15:33:50 2020 -0700 SAMZA-2589: Consolidate Beam and High/Low Samza Apps launch workflow (#1428) Issues: app.main.class is only set for Beam apps which causes different workflow on AM when launching a job. Changes: 1. Introduce DefaultApplicationMain to capture launch workflow for High/Low level jobs so on AM, all jobs are launched in the same way: ClusterBasedJobCoordinatorRunner#main -> app.main.class -> JobCoordinatorLaunchUtil 2. Update ApplicationConfig#getAppMainClass to default to DefaultApplicationMain Tests: 1. Unit Tests 2. Deployed hello samza job successfully with the change following instructions on http://samza.apache.org/startup/hello-samza/latest/ API Changes: None Upgrade Instructions: None Usage Instructions: None --- gradle/dependency-versions.gradle | 2 +- .../ClusterBasedJobCoordinatorRunner.java | 38 ++++------- .../clustermanager/DefaultApplicationMain.java | 47 ++++++++++++++ .../org/apache/samza/config/ApplicationConfig.java | 10 +-- .../clustermanager/TestDefaultApplicationMain.java | 74 ++++++++++++++++++++++ 5 files changed, 136 insertions(+), 35 deletions(-) diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 061f0b7..28acb2a 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -36,7 +36,7 @@ jerseyVersion = "2.22.1" jettyVersion = "9.4.20.v20190813" jodaTimeVersion = "2.2" - joptSimpleVersion = "3.2" + joptSimpleVersion = "5.0.4" junitVersion = "4.12" kafkaVersion = "2.0.1" log4jVersion = "1.2.17" diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java index 107c473..a152032 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java @@ -26,17 +26,14 @@ import java.util.Arrays; import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; -import org.apache.samza.application.ApplicationUtil; import org.apache.samza.classloader.IsolatingClassLoaderFactory; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.ShellCommandConfig; import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.serializers.model.SamzaObjectMapper; -import org.apache.samza.util.ConfigUtil; import org.apache.samza.util.CoordinatorStreamUtil; import org.apache.samza.util.SplitDeploymentUtil; import org.slf4j.Logger; @@ -94,30 +91,16 @@ public class ClusterBasedJobCoordinatorRunner { * For Beam jobs, app.main.class will be Beam's main class * and app.main.args will be Beam's pipeline options. */ - if (appConfig.getAppMainClass().isPresent()) { - String className = appConfig.getAppMainClass().get(); - LOG.info("Invoke main {}", className); - try { - Class<?> cls = Class.forName(className); - Method mainMethod = cls.getMethod("main", String[].class); - mainMethod.invoke(null, (Object) toArgs(appConfig)); - } catch (Exception e) { - throw new SamzaException(e); - } - } else { - JobConfig jobConfig = new JobConfig(submissionConfig); - - if (!jobConfig.getConfigLoaderFactory().isPresent()) { - throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is required to initialize job coordinator from config loader"); - } - - // load full job config with ConfigLoader - Config originalConfig = ConfigUtil.loadConfig(submissionConfig); - - JobCoordinatorLaunchUtil.run(ApplicationUtil.fromConfig(originalConfig), originalConfig); + String className = appConfig.getAppMainClass(); + String[] arguments = toArgs(appConfig); + LOG.info("Invoke main {} with args {}", className, arguments); + try { + Class<?> cls = Class.forName(className); + Method mainMethod = cls.getMethod("main", String[].class); + mainMethod.invoke(null, (Object) arguments); + } catch (Exception e) { + throw new SamzaException(e); } - - LOG.info("Finished running ClusterBasedJobCoordinator"); } else { // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed. Config coordinatorSystemConfig; @@ -133,8 +116,9 @@ public class ClusterBasedJobCoordinatorRunner { } ClusterBasedJobCoordinator jc = createFromMetadataStore(coordinatorSystemConfig); jc.run(); - LOG.info("Finished running ClusterBasedJobCoordinator"); } + + LOG.info("Finished running ClusterBasedJobCoordinator"); } /** diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/DefaultApplicationMain.java b/samza-core/src/main/java/org/apache/samza/clustermanager/DefaultApplicationMain.java new file mode 100644 index 0000000..9611bf9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/DefaultApplicationMain.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +import com.google.common.annotations.VisibleForTesting; +import joptsimple.OptionSet; +import org.apache.samza.application.ApplicationUtil; +import org.apache.samza.config.Config; +import org.apache.samza.runtime.ApplicationRunnerMain; +import org.apache.samza.util.ConfigUtil; + + +public class DefaultApplicationMain { + + public static void main(String[] args) { + run(args); + } + + @VisibleForTesting + static void run(String[] args) { + // This branch is ONLY for Yarn deployments, standalone apps uses offspring + final ApplicationRunnerMain.ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerMain.ApplicationRunnerCommandLine(); + cmdLine.parser().allowsUnrecognizedOptions(); + + final OptionSet options = cmdLine.parser().parse(args); + // load full job config with ConfigLoader + final Config originalConfig = ConfigUtil.loadConfig(cmdLine.loadConfig(options)); + + JobCoordinatorLaunchUtil.run(ApplicationUtil.fromConfig(originalConfig), originalConfig); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java index ea2e943..3dabd05 100644 --- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java @@ -18,7 +18,7 @@ */ package org.apache.samza.config; -import java.util.Optional; +import org.apache.samza.clustermanager.DefaultApplicationMain; import org.apache.samza.runtime.UUIDGenerator; @@ -103,12 +103,8 @@ public class ApplicationConfig extends MapConfig { return ApplicationMode.valueOf(get(APP_MODE, ApplicationMode.STREAM.name()).toUpperCase()); } - public Optional<String> getAppMainArgs() { - return Optional.ofNullable(get(APP_MAIN_ARGS)); - } - - public Optional<String> getAppMainClass() { - return Optional.ofNullable(get(APP_MAIN_CLASS)); + public String getAppMainClass() { + return get(APP_MAIN_CLASS, DefaultApplicationMain.class.getName()); } public String getAppRunnerClass() { diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestDefaultApplicationMain.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestDefaultApplicationMain.java new file mode 100644 index 0000000..57fbed8 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestDefaultApplicationMain.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +import org.apache.samza.application.ApplicationUtil; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory; +import org.apache.samza.util.ConfigUtil; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; +import static org.powermock.api.mockito.PowerMockito.doNothing; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.verifyStatic; +import static org.powermock.api.mockito.PowerMockito.when; + + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ + ApplicationUtil.class, + ConfigUtil.class, + JobCoordinatorLaunchUtil.class, + ClusterBasedJobCoordinatorRunner.class}) +public class TestDefaultApplicationMain { + + @Test + public void testRun() throws Exception { + String[] args = new String[] { + "--config", + JobConfig.CONFIG_LOADER_FACTORY + "=" + PropertiesConfigLoaderFactory.class.getName(), + "--config", + PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path=" + getClass().getResource("/test.properties").getPath() + }; + + StreamApplication mockApplication = mock(StreamApplication.class); + Config mockConfig = mock(Config.class); + mockStatic(JobCoordinatorLaunchUtil.class, ApplicationUtil.class, ConfigUtil.class); + + when(ApplicationUtil.fromConfig(any())) + .thenReturn(mockApplication); + when(ConfigUtil.loadConfig(any())) + .thenReturn(mockConfig); + doNothing() + .when(JobCoordinatorLaunchUtil.class, "run", + mockApplication, mockConfig); + DefaultApplicationMain.run(args); + + verifyStatic(times(1)); + JobCoordinatorLaunchUtil.run(mockApplication, mockConfig); + } +}