Repository: incubator-airflow
Updated Branches:
  refs/heads/master 48135ad25 -> 2e3f07ff9


[AIRFLOW-1160] Update Spark parameters for Mesos

Closes #2265 from cameres/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2e3f07ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2e3f07ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2e3f07ff

Branch: refs/heads/master
Commit: 2e3f07ff9fb3a969285f4b3f412cd63d95c05457
Parents: 48135ad
Author: Connor Ameres <connorame...@gmail.com>
Authored: Mon May 1 23:22:04 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon May 1 23:22:04 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/spark_sql_hook.py               | 8 +++++++-
 airflow/contrib/hooks/spark_submit_hook.py            | 8 +++++++-
 airflow/contrib/operators/spark_sql_operator.py       | 7 ++++++-
 airflow/contrib/operators/spark_submit_operator.py    | 7 ++++++-
 tests/contrib/hooks/test_spark_submit_hook.py         | 2 ++
 tests/contrib/operators/test_spark_submit_operator.py | 2 ++
 6 files changed, 30 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e3f07ff/airflow/contrib/hooks/spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_sql_hook.py 
b/airflow/contrib/hooks/spark_sql_hook.py
index 724409c..725db21 100644
--- a/airflow/contrib/hooks/spark_sql_hook.py
+++ b/airflow/contrib/hooks/spark_sql_hook.py
@@ -31,7 +31,9 @@ class SparkSqlHook(BaseHook):
     :type conf: str (format: PROP=VALUE)
     :param conn_id: connection_id string
     :type conn_id: str
-    :param executor_cores: Number of cores per executor
+    :param total_executor_cores: (Standalone & Mesos only) Total cores for all 
executors (Default: all the available cores on the worker)
+    :type total_executor_cores: int
+    :param executor_cores: (Standalone & YARN only) Number of cores per 
executor (Default: 2)
     :type executor_cores: int
     :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
     :type executor_memory: str
@@ -52,6 +54,7 @@ class SparkSqlHook(BaseHook):
                  sql,
                  conf=None,
                  conn_id='spark_sql_default',
+                 total_executor_cores=None,
                  executor_cores=None,
                  executor_memory=None,
                  keytab=None,
@@ -64,6 +67,7 @@ class SparkSqlHook(BaseHook):
         self._sql = sql
         self._conf = conf
         self._conn = self.get_connection(conn_id)
+        self._total_executor_cores = total_executor_cores
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
         self._keytab = keytab
@@ -89,6 +93,8 @@ class SparkSqlHook(BaseHook):
         if self._conf:
             for conf_el in self._conf.split(","):
                 connection_cmd += ["--conf", conf_el]
+        if self._total_executor_cores:
+            connection_cmd += ["--total-executor-cores", 
str(self._total_executor_cores)]
         if self._executor_cores:
             connection_cmd += ["--executor-cores", str(self._executor_cores)]
         if self._executor_memory:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e3f07ff/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py 
b/airflow/contrib/hooks/spark_submit_hook.py
index e4ce797..c34538e 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -42,7 +42,9 @@ class SparkSubmitHook(BaseHook):
     :type jars: str
     :param java_class: the main class of the Java application
     :type java_class: str
-    :param executor_cores: Number of cores per executor (Default: 2)
+    :param total_executor_cores: (Standalone & Mesos only) Total cores for all 
executors (Default: all the available cores on the worker)
+    :type total_executor_cores: int
+    :param executor_cores: (Standalone & YARN only) Number of cores per 
executor (Default: 2)
     :type executor_cores: int
     :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
     :type executor_memory: str
@@ -69,6 +71,7 @@ class SparkSubmitHook(BaseHook):
                  py_files=None,
                  jars=None,
                  java_class=None,
+                 total_executor_cores=None,
                  executor_cores=None,
                  executor_memory=None,
                  driver_memory=None,
@@ -84,6 +87,7 @@ class SparkSubmitHook(BaseHook):
         self._py_files = py_files
         self._jars = jars
         self._java_class = java_class
+        self._total_executor_cores = total_executor_cores
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
         self._driver_memory = driver_memory
@@ -163,6 +167,8 @@ class SparkSubmitHook(BaseHook):
             connection_cmd += ["--jars", self._jars]
         if self._num_executors:
             connection_cmd += ["--num-executors", str(self._num_executors)]
+        if self._total_executor_cores:
+            connection_cmd += ["--total-executor-cores", 
str(self._total_executor_cores)]
         if self._executor_cores:
             connection_cmd += ["--executor-cores", str(self._executor_cores)]
         if self._executor_memory:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e3f07ff/airflow/contrib/operators/spark_sql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_sql_operator.py 
b/airflow/contrib/operators/spark_sql_operator.py
index 2bed535..246e808 100644
--- a/airflow/contrib/operators/spark_sql_operator.py
+++ b/airflow/contrib/operators/spark_sql_operator.py
@@ -26,7 +26,9 @@ class SparkSqlOperator(BaseOperator):
     :type conf: str (format: PROP=VALUE)
     :param conn_id: connection_id string
     :type conn_id: str
-    :param executor_cores: Number of cores per executor
+    :param total_executor_cores: (Standalone & Mesos only) Total cores for all 
executors (Default: all the available cores on the worker)
+    :type total_executor_cores: int
+    :param executor_cores: (Standalone & YARN only) Number of cores per 
executor (Default: 2)
     :type executor_cores: int
     :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
     :type executor_memory: str
@@ -52,6 +54,7 @@ class SparkSqlOperator(BaseOperator):
                  sql,
                  conf=None,
                  conn_id='spark_sql_default',
+                 total_executor_cores=None,
                  executor_cores=None,
                  executor_memory=None,
                  keytab=None,
@@ -65,6 +68,7 @@ class SparkSqlOperator(BaseOperator):
         self._sql = sql
         self._conf = conf
         self._conn_id = conn_id
+        self._total_executor_cores = total_executor_cores
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
         self._keytab = keytab
@@ -81,6 +85,7 @@ class SparkSqlOperator(BaseOperator):
         self._hook = SparkSqlHook(sql=self._sql,
                                   conf=self._conf,
                                   conn_id=self._conn_id,
+                                  
total_executor_cores=self._total_executor_cores,
                                   executor_cores=self._executor_cores,
                                   executor_memory=self._executor_memory,
                                   keytab=self._keytab,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e3f07ff/airflow/contrib/operators/spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_submit_operator.py 
b/airflow/contrib/operators/spark_submit_operator.py
index 2a7e3cf..77aacd3 100644
--- a/airflow/contrib/operators/spark_submit_operator.py
+++ b/airflow/contrib/operators/spark_submit_operator.py
@@ -42,7 +42,9 @@ class SparkSubmitOperator(BaseOperator):
     :type jars: str
     :param java_class: the main class of the Java application
     :type java_class: str
-    :param executor_cores: Number of cores per executor (Default: 2)
+    :param total_executor_cores: (Standalone & Mesos only) Total cores for all 
executors (Default: all the available cores on the worker)
+    :type total_executor_cores: int
+    :param executor_cores: (Standalone & YARN only) Number of cores per 
executor (Default: 2)
     :type executor_cores: int
     :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
     :type executor_memory: str
@@ -71,6 +73,7 @@ class SparkSubmitOperator(BaseOperator):
                  py_files=None,
                  jars=None,
                  java_class=None,
+                 total_executor_cores=None,
                  executor_cores=None,
                  executor_memory=None,
                  driver_memory=None,
@@ -89,6 +92,7 @@ class SparkSubmitOperator(BaseOperator):
         self._py_files = py_files
         self._jars = jars
         self._java_class = java_class
+        self._total_executor_cores = total_executor_cores
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
         self._driver_memory = driver_memory
@@ -112,6 +116,7 @@ class SparkSubmitOperator(BaseOperator):
             py_files=self._py_files,
             jars=self._jars,
             java_class=self._java_class,
+            total_executor_cores=self._total_executor_cores,
             executor_cores=self._executor_cores,
             executor_memory=self._executor_memory,
             driver_memory=self._driver_memory,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e3f07ff/tests/contrib/hooks/test_spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_spark_submit_hook.py 
b/tests/contrib/hooks/test_spark_submit_hook.py
index 81916ad..e06d44c 100644
--- a/tests/contrib/hooks/test_spark_submit_hook.py
+++ b/tests/contrib/hooks/test_spark_submit_hook.py
@@ -34,6 +34,7 @@ class TestSparkSubmitHook(unittest.TestCase):
         'files': 'hive-site.xml',
         'py_files': 'sample_library.py',
         'jars': 'parquet.jar',
+        'total_executor_cores': 4,
         'executor_cores': 4,
         'executor_memory': '22g',
         'keytab': 'privileged_user.keytab',
@@ -94,6 +95,7 @@ class TestSparkSubmitHook(unittest.TestCase):
         assert "--files {}".format(self._config['files']) in cmd
         assert "--py-files {}".format(self._config['py_files']) in cmd
         assert "--jars {}".format(self._config['jars']) in cmd
+        assert "--total-executor-cores 
{}".format(self._config['total_executor_cores']) in cmd
         assert "--executor-cores {}".format(self._config['executor_cores']) in 
cmd
         assert "--executor-memory {}".format(self._config['executor_memory']) 
in cmd
         assert "--keytab {}".format(self._config['keytab']) in cmd

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e3f07ff/tests/contrib/operators/test_spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_spark_submit_operator.py 
b/tests/contrib/operators/test_spark_submit_operator.py
index dd3d84b..6bed6a1 100644
--- a/tests/contrib/operators/test_spark_submit_operator.py
+++ b/tests/contrib/operators/test_spark_submit_operator.py
@@ -32,6 +32,7 @@ class TestSparkSubmitOperator(unittest.TestCase):
         'files': 'hive-site.xml',
         'py_files': 'sample_library.py',
         'jars': 'parquet.jar',
+        'total_executor_cores':4,
         'executor_cores': 4,
         'executor_memory': '22g',
         'keytab': 'privileged_user.keytab',
@@ -75,6 +76,7 @@ class TestSparkSubmitOperator(unittest.TestCase):
         self.assertEqual(self._config['files'], operator._files)
         self.assertEqual(self._config['py_files'], operator._py_files)
         self.assertEqual(self._config['jars'], operator._jars)
+        self.assertEqual(self._config['total_executor_cores'], 
operator._total_executor_cores)
         self.assertEqual(self._config['executor_cores'], 
operator._executor_cores)
         self.assertEqual(self._config['executor_memory'], 
operator._executor_memory)
         self.assertEqual(self._config['keytab'], operator._keytab)

Reply via email to