This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 4b67847 [BEAM-7110] Add Spark master option to SparkJobServerDriver new 81faf35 Merge pull request #8379 from ibzib/spark-master2 4b67847 is described below commit 4b67847193da92cc8e59d37b0539bfb2bc6ab37f Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Thu Apr 18 12:54:03 2019 -0700 [BEAM-7110] Add Spark master option to SparkJobServerDriver --- runners/spark/job-server/build.gradle | 2 ++ .../apache/beam/runners/spark/SparkJobInvoker.java | 14 ++++++++++--- .../beam/runners/spark/SparkJobServerDriver.java | 23 +++++++++++++++++----- .../beam/runners/spark/SparkPipelineOptions.java | 4 +++- 4 files changed, 34 insertions(+), 9 deletions(-) diff --git a/runners/spark/job-server/build.gradle b/runners/spark/job-server/build.gradle index 2ce34fe..7ebde87 100644 --- a/runners/spark/job-server/build.gradle +++ b/runners/spark/job-server/build.gradle @@ -70,6 +70,8 @@ runShadow { args += ["--artifacts-dir=${project.property('artifactsDir')}"] if (project.hasProperty('cleanArtifactsPerJob')) args += ["--clean-artifacts-per-job=${project.property('cleanArtifactsPerJob')}"] + if (project.hasProperty('sparkMasterUrl')) + args += ["--spark-master-url=${project.property('sparkMasterUrl')}"] // Enable remote debugging. jvmArgs = ["-Xdebug", "-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"] diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java index e47c851..da35ae2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java @@ -34,12 +34,16 @@ public class SparkJobInvoker extends JobInvoker { private static final Logger LOG = LoggerFactory.getLogger(SparkJobInvoker.class); - public static SparkJobInvoker create() { - return new SparkJobInvoker(); + private SparkJobServerDriver.SparkServerConfiguration configuration; + + public static SparkJobInvoker create( + SparkJobServerDriver.SparkServerConfiguration configuration) { + return new SparkJobInvoker(configuration); } - private SparkJobInvoker() { + private SparkJobInvoker(SparkJobServerDriver.SparkServerConfiguration configuration) { super("spark-runner-job-invoker"); + this.configuration = configuration; } @Override @@ -56,6 +60,10 @@ public class SparkJobInvoker extends JobInvoker { String.format("%s_%s", sparkOptions.getJobName(), UUID.randomUUID().toString()); LOG.info("Invoking job {}", invocationId); + if (sparkOptions.getSparkMaster().equals(SparkPipelineOptions.DEFAULT_MASTER_URL)) { + sparkOptions.setSparkMaster(configuration.getSparkMasterUrl()); + } + // Options can't be translated to proto if runner class is unresolvable, so set it to null. sparkOptions.setRunner(null); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java index 387907f..0589045 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.kohsuke.args4j.CmdLineException; import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.Option; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,11 +33,23 @@ public class SparkJobServerDriver extends JobServerDriver { @Override protected JobInvoker createJobInvoker() { - return SparkJobInvoker.create(); + return SparkJobInvoker.create((SparkServerConfiguration) configuration); } private static final Logger LOG = LoggerFactory.getLogger(SparkJobServerDriver.class); + /** Spark runner-specific Configuration for the jobServer. */ + public static class SparkServerConfiguration extends ServerConfiguration { + @Option( + name = "--spark-master-url", + usage = "Spark master url to submit job (e.g. spark://host:port, local[4])") + private String sparkMasterUrl = SparkPipelineOptions.DEFAULT_MASTER_URL; + + String getSparkMasterUrl() { + return this.sparkMasterUrl; + } + } + public static void main(String[] args) { FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create()); fromParams(args).run(); @@ -50,7 +63,7 @@ public class SparkJobServerDriver extends JobServerDriver { } private static SparkJobServerDriver fromParams(String[] args) { - ServerConfiguration configuration = new ServerConfiguration(); + SparkServerConfiguration configuration = new SparkServerConfiguration(); CmdLineParser parser = new CmdLineParser(configuration); try { parser.parseArgument(args); @@ -63,7 +76,7 @@ public class SparkJobServerDriver extends JobServerDriver { return fromConfig(configuration); } - private static SparkJobServerDriver fromConfig(ServerConfiguration configuration) { + private static SparkJobServerDriver fromConfig(SparkServerConfiguration configuration) { return create( configuration, createJobServerFactory(configuration), @@ -71,14 +84,14 @@ public class SparkJobServerDriver extends JobServerDriver { } private static SparkJobServerDriver create( - ServerConfiguration configuration, + SparkServerConfiguration configuration, ServerFactory jobServerFactory, ServerFactory artifactServerFactory) { return new SparkJobServerDriver(configuration, jobServerFactory, artifactServerFactory); } private SparkJobServerDriver( - ServerConfiguration configuration, + SparkServerConfiguration configuration, ServerFactory jobServerFactory, ServerFactory artifactServerFactory) { super(configuration, jobServerFactory, artifactServerFactory); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index 6935b54..4bf51e8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -33,8 +33,10 @@ import org.apache.beam.sdk.options.StreamingOptions; public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions, ApplicationNameOptions { + String DEFAULT_MASTER_URL = "local[4]"; + @Description("The url of the spark master to connect to, (e.g. spark://host:port, local[4]).") - @Default.String("local[4]") + @Default.String(DEFAULT_MASTER_URL) String getSparkMaster(); void setSparkMaster(String master);