This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ae33b82d78 [SPARK-44264][PYTHON] E2E Testing for Deepspeed
9ae33b82d78 is described below

commit 9ae33b82d78d65f58acb253c0b710b5807f9912d
Author: Mathew Jacob <mathew.ja...@databricks.com>
AuthorDate: Fri Jul 28 09:14:45 2023 +0900

    [SPARK-44264][PYTHON] E2E Testing for Deepspeed
    
    ### What Changes Were Proposed
    This PR adds some end to end tests for the DeepspeedTorchDistributor. Due 
to the lack of support currently available to deepspeed, we use proxy functions 
that are very simple to test that the command works. For the future, once 
Deepspeed supports more CPU, these end to end tests should instead migrate to 
more actual Deepspeed workloads, such as those described in the PR comments.
    
    ### Why Do We Need These Changes
    Previously, we only had unit tests for helper functions. These test actual 
workloads that a user may use DeepspeedTorchDistributor from end to end.
    
    ### Any User Facing Changes
    No, these are end to end tests.
    
    ### How Was This Tested
    Running the tests and seeing if they pass.
    
    Closes #42118 from mathewjacob1002/gpu_e2e_tests.
    
    Authored-by: Mathew Jacob <mathew.ja...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 dev/requirements.txt                               |   3 +
 dev/tox.ini                                        |   1 +
 python/mypy.ini                                    |   3 +
 .../deepspeed/tests/test_deepspeed_distributor.py  | 134 ++++++++++++++++++++-
 4 files changed, 139 insertions(+), 2 deletions(-)

diff --git a/dev/requirements.txt b/dev/requirements.txt
index f5fe5fa071f..38a9b244710 100644
--- a/dev/requirements.txt
+++ b/dev/requirements.txt
@@ -65,3 +65,6 @@ torch
 torchvision
 torcheval
 
+# DeepspeedTorchDistributor dependencies
+deepspeed
+
diff --git a/dev/tox.ini b/dev/tox.ini
index e2a77786ed4..438f82fec1e 100644
--- a/dev/tox.ini
+++ b/dev/tox.ini
@@ -46,6 +46,7 @@ exclude =
     */target/*,
     docs/.local_ruby_bundle/,
     *python/pyspark/cloudpickle/*.py,
+    *python/pyspark/ml/deepspeed/tests/*.py
     *python/docs/build/*,
     *python/docs/source/conf.py,
     *python/.eggs/*,
diff --git a/python/mypy.ini b/python/mypy.ini
index a845cd88bd8..4d1fc3ceb66 100644
--- a/python/mypy.ini
+++ b/python/mypy.ini
@@ -88,6 +88,9 @@ ignore_errors = True
 [mypy-pyspark.ml.torch.tests.*]
 ignore_errors = True
 
+[mypy-pyspark.ml.deepspeed.tests.*]
+ignore_errors = True
+
 [mypy-pyspark.mllib.tests.*]
 ignore_errors = True
 
diff --git a/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py 
b/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py
index 4c4606699a3..590e541c384 100644
--- a/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py
+++ b/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py
@@ -1,4 +1,4 @@
-#
+# mypy: ignore-errors
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -14,12 +14,28 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from contextlib import contextmanager
 import os
+import shutil
 import sys
-from typing import Any, Tuple, Dict
+import textwrap
+from typing import Any, Callable, Dict, Tuple
 import unittest
 
+from pyspark import SparkConf, SparkContext
 from pyspark.ml.deepspeed.deepspeed_distributor import 
DeepspeedTorchDistributor
+from pyspark.sql import SparkSession
+from pyspark.ml.torch.tests.test_distributor import (
+    get_local_mode_conf,
+    set_up_test_dirs,
+    get_distributed_mode_conf,
+)
+
+have_deepspeed = True
+try:
+    import deepspeed  # noqa: F401
+except ImportError:
+    have_deepspeed = False
 
 
 class DeepspeedTorchDistributorUnitTests(unittest.TestCase):
@@ -164,6 +180,120 @@ class 
DeepspeedTorchDistributorUnitTests(unittest.TestCase):
             self.assertEqual(distributed_cmd_args_expected, 
distributed_command_with_args)
 
 
+def _create_basic_function() -> Callable:
+    # TODO: swap out with better test function
+    # once Deepspeed better supports CPU
+    def pythagoras(leg1: float, leg2: float) -> float:
+        import deepspeed
+
+        print(deepspeed.__version__)
+        return (leg1 * leg1 + leg2 * leg2) ** 0.5
+
+    return pythagoras
+
+
+@contextmanager
+def _create_pytorch_training_test_file():
+    # Note: when Deepspeed CPU support becomes better,
+    # switch in more realistic training files using Deepspeed
+    # optimizations + constructs
+    str_to_write = textwrap.dedent(
+        """ 
+            import sys
+            def pythagorean_thm(x : int, y: int): # type: ignore 
+                import deepspeed # type: ignore
+                return (x*x + y*y)**0.5 # type: ignore
+            print(pythagorean_thm(int(sys.argv[1]), int(sys.argv[2])))"""
+    )
+    cp_path = "/tmp/test_deepspeed_training_file.py"
+    with open(cp_path, "w") as f:
+        f.write(str_to_write)
+    yield cp_path
+    os.remove(cp_path)
+
+
+# The program and function that we use in the end-to-end tests
+# is very simple because in the Spark CI we only have access
+# to CPUs and at this point in time, CPU support is limited
+# in Deepspeed. Once Deepspeed better supports CPU training
+# and inference, the hope is to switch out the training
+# and file for the tests with more realistic testing
+# that use Deepspeed constructs.
+@unittest.skipIf(not have_deepspeed, "deepspeed is required for these tests")
+class DeepspeedTorchDistributorDistributedEndToEnd(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls) -> None:
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = 
set_up_test_dirs()  # noqa
+        # "loadDefaults" is set to False because if not, the SparkConf will
+        # use contain configurations from the LocalEndToEnd test,
+        # which causes the test to break.
+        conf = SparkConf(loadDefaults=False)
+        for k, v in get_distributed_mode_conf().items():
+            conf = conf.set(k, v)
+        conf = conf.set(
+            "spark.worker.resource.gpu.discoveryScript", 
cls.gpu_discovery_script_file_name
+        )
+        sc = SparkContext("local-cluster[2,2,512]", cls.__name__, conf=conf)
+        cls.spark = SparkSession(sc)
+
+    @classmethod
+    def tearDownClass(cls) -> None:
+        shutil.rmtree(cls.mnist_dir_path)
+        os.unlink(cls.gpu_discovery_script_file_name)
+        cls.spark.stop()
+
+    def test_simple_function_e2e(self) -> None:
+        train_fn = _create_basic_function()
+        # Arguments for the pythagoras function train_fn
+        x = 3
+        y = 4
+        dist = DeepspeedTorchDistributor(numGpus=2, useGpu=False, 
localMode=False)
+        output = dist.run(train_fn, x, y)
+        self.assertEqual(output, 5)
+
+    def test_pytorch_file_e2e(self) -> None:
+        # TODO: change to better test script
+        # once Deepspeed CPU support is better
+        with _create_pytorch_training_test_file() as cp_path:
+            dist = DeepspeedTorchDistributor(numGpus=True, useGpu=False, 
localMode=False)
+            dist.run(cp_path, 2, 5)
+
+
+@unittest.skipIf(not have_deepspeed, "deepspeed is required for these tests")
+class DeepspeedDistributorLocalEndToEndTests(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls) -> None:
+        cls.gpu_discovery_script_file_name, cls.mnist_dir_path = 
set_up_test_dirs()  # noqa
+        conf = SparkConf()
+        for k, v in get_local_mode_conf().items():
+            conf = conf.set(k, v)
+        conf = conf.set(
+            "spark.driver.resource.gpu.discoveryScript", 
cls.gpu_discovery_script_file_name
+        )
+        sc = SparkContext("local-cluster[2,2,512]", cls.__name__, conf=conf)
+        cls.spark = SparkSession(sc)
+
+    @classmethod
+    def tearDownClass(cls) -> None:
+        shutil.rmtree(cls.mnist_dir_path)
+        os.unlink(cls.gpu_discovery_script_file_name)
+        cls.spark.stop()
+
+    def test_simple_function_e2e(self) -> None:
+        train_fn = _create_basic_function()
+        # Arguments for the pythagoras function train_fn
+        x = 3
+        y = 4
+        dist = DeepspeedTorchDistributor(numGpus=2, useGpu=False, 
localMode=True)
+        output = dist.run(train_fn, x, y)
+        self.assertEqual(output, 5)
+
+    def test_pytorch_file_e2e(self) -> None:
+        with _create_pytorch_training_test_file() as path_to_train_file:
+            dist = DeepspeedTorchDistributor(numGpus=2, useGpu=False, 
localMode=True)
+            dist.run(path_to_train_file, 2, 5)
+
+
 if __name__ == "__main__":
     from pyspark.ml.deepspeed.tests.test_deepspeed_distributor import *  # 
noqa: F401,F403
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to