This is an automated email from the ASF dual-hosted git repository.
kuanhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 28e3b76 SUBMARINE-1117. Connect API for CLI Experiments
28e3b76 is described below
commit 28e3b76d149b5cff538bd7ad4973c2e5970b3d09
Author: atosystem <[email protected]>
AuthorDate: Fri Jan 14 22:37:31 2022 +0800
SUBMARINE-1117. Connect API for CLI Experiments
### What is this PR for?
1. Implement API with CLI for experiments with nice format and async
fetching status
2. Implement e2e test for CLI experiments
### What type of PR is it?
[Feature]
### Todos
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-1117
### How should this be tested?
e2e test is implemented
### Screenshots (if appropriate)

### Questions:
* Do the license files need updating? No
* Are there breaking changes for older versions? No
* Does this need new documentation? Yes
Author: atosystem <[email protected]>
Signed-off-by: kuanhsun <[email protected]>
Closes #831 from atosystem/SUBMARINE-1117 and squashes the following
commits:
48b5a622 [atosystem] SUBMARINE-1117. remove redundant line
02fbc118 [atosystem] SUBMARINE-1117. remove print
f7fcb1e3 [atosystem] SUBMARINE-1117. fix error
707582ec [atosystem] SUBMARINE-1117. remove wait experiment finish in test
7b0050ec [atosystem] SUBMARINE-1117. test github action no wait
e4eaf4a6 [atosystem] SUBMARINE-1117. test github action
23e0594c [atosystem] SUBMARINE-1117. test github action
f1e8b00d [atosystem] SUBMARINE-1117. test github action
9d7afc76 [atosystem] SUBMARINE-1117. revise hostname to port
45cbe829 [atosystem] SUBMARINE-1117. test github action
4cb1daa3 [atosystem] SUBMARINE-1117. test github action
ac26b775 [atosystem] SUBMARINE-1117. test github action
3430043a [atosystem] SUBMARINE-1117. test github action
208d0cbc [atosystem] SUBMARINE-1117. double check cli config
f2790afb [atosystem] SUBMARINE-1117. fix e2e test wrong port
db91cb41 [atosystem] SUBMARINE-1117. change generate_host
4a2985c6 [atosystem] SUBMARINE-1117. lint
5a9d1cfa [atosystem] SUBMARINE-1117. Use config in experiment api lient
ef38dddb [atosystem] SUBMARINE-1117. Change python import path in command.py
0df473a7 [atosystem] SUBMARINE-1117. Change python import path
6a16b0f8 [atosystem] SUBMARINE-1117. Set default flag to no-wait
40d01972 [atosystem] SUBMARINE-1117. Add wait and no-wait options for
deleting experiments
3083c4f2 [atosystem] SUBMARINE-1117. Fix code for the comments
2e49d239 [atosystem] SUBMARINE-1117. lint
24ecd4d0 [atosystem] SUBMARINE-1117. Fix e2e test
d8bd7d6e [atosystem] SUBMARINE-1117. Add Integration Tests for CLI
Experiments
79439953 [atosystem] SUBMARINE-1117. Finished CLI for Experiment
---
.github/workflows/python.yml | 5 +
.../submarine/cli/experiment/command.py | 124 ++++++++++++++++++++-
.../submarine/client/api/experiment_client.py | 29 ++++-
.../pysubmarine/tests/cli/test_experiment.py | 85 +++++++++++---
4 files changed, 223 insertions(+), 20 deletions(-)
diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml
index 973a3c8..b7ac163 100644
--- a/.github/workflows/python.yml
+++ b/.github/workflows/python.yml
@@ -112,6 +112,11 @@ jobs:
pip install --no-cache-dir tensorflow-addons
pip install --no-cache-dir tf_slim
pip install -r
./submarine-sdk/pysubmarine/github-actions/test-requirements.txt
+ - name: Set CLI config for e2e pytest
+ run: |
+ submarine config set connection.hostname localhost
+ submarine config set connection.port 8080
+ submarine config list
- name: Run integration test
working-directory: ./submarine-sdk/pysubmarine
run: pytest --cov=submarine -vs -m "e2e"
diff --git a/submarine-sdk/pysubmarine/submarine/cli/experiment/command.py
b/submarine-sdk/pysubmarine/submarine/cli/experiment/command.py
index 6f2c182..27b1e87 100644
--- a/submarine-sdk/pysubmarine/submarine/cli/experiment/command.py
+++ b/submarine-sdk/pysubmarine/submarine/cli/experiment/command.py
@@ -15,24 +15,140 @@
under the License.
"""
+import json
+import time
+
import click
+from rich.console import Console
+from rich.json import JSON as richJSON
+from rich.panel import Panel
+from rich.table import Table
+
+from submarine.cli.config.config import loadConfig
+from submarine.client.api.experiment_client import ExperimentClient
+from submarine.client.exceptions import ApiException
+
+submarineCliConfig = loadConfig()
+if submarineCliConfig is None:
+ exit(1)
+experimentClient = ExperimentClient(
+ host="http://{}:{}".format(
+ submarineCliConfig.connection.hostname,
submarineCliConfig.connection.port
+ )
+)
+
+POLLING_INTERVAL = 1 # sec
+TIMEOUT = 30 # sec
@click.command("experiment")
def list_experiment():
"""List experiments"""
- click.echo("list experiment!")
+ COLS_TO_SHOW = ["Name", "Id", "Tags", "Finished Time", "Created Time",
"Running Time", "Status"]
+ console = Console()
+ try:
+ thread = experimentClient.list_experiments_async()
+ timeout = time.time() + TIMEOUT
+ with console.status("[bold green] Fetching Experiments..."):
+ while not thread.ready():
+ time.sleep(POLLING_INTERVAL)
+ if time.time() > timeout:
+ console.print("[bold red] Timeout!")
+ return
+
+ result = thread.get()
+ results = result.result
+
+ results = list(
+ map(
+ lambda r: [
+ r["spec"]["meta"]["name"],
+ r["experimentId"],
+ ",".join(r["spec"]["meta"]["tags"]),
+ r["finishedTime"],
+ r["createdTime"],
+ r["runningTime"],
+ r["status"],
+ ],
+ results,
+ )
+ )
+
+ table = Table(title="List of Experiments")
+
+ for col in COLS_TO_SHOW:
+ table.add_column(col, overflow="fold")
+ for res in results:
+ table.add_row(*res)
+
+ console.print(table)
+
+ except ApiException as err:
+ if err.body is not None:
+ errbody = json.loads(err.body)
+ click.echo("[Api Error] {}".format(errbody["message"]))
+ else:
+ click.echo("[Api Error] {}".format(err))
@click.command("experiment")
@click.argument("id")
def get_experiment(id):
"""Get experiments"""
- click.echo("get experiment! id={}".format(id))
+ console = Console()
+ try:
+ thread = experimentClient.get_experiment_async(id)
+ timeout = time.time() + TIMEOUT
+ with console.status("[bold green] Fetching Experiment(id = {}
)...".format(id)):
+ while not thread.ready():
+ time.sleep(POLLING_INTERVAL)
+ if time.time() > timeout:
+ console.print("[bold red] Timeout!")
+ return
+
+ result = thread.get()
+ result = result.result
+
+ json_data = richJSON.from_data(result)
+ console.print(Panel(json_data, title="Experiment(id = {}
)".format(id)))
+ except ApiException as err:
+ if err.body is not None:
+ errbody = json.loads(err.body)
+ click.echo("[Api Error] {}".format(errbody["message"]))
+ else:
+ click.echo("[Api Error] {}".format(err))
@click.command("experiment")
@click.argument("id")
-def delete_experiment(id):
[email protected]("--wait/--no-wait", is_flag=True, default=False)
+def delete_experiment(id, wait):
"""Delete experiment"""
- click.echo("delete experiment! id={}".format(id))
+ console = Console()
+ try:
+ thread = experimentClient.delete_experiment_async(id)
+ timeout = time.time() + TIMEOUT
+ with console.status("[bold green] Deleting Experiment(id = {}
)...".format(id)):
+ while not thread.ready():
+ time.sleep(POLLING_INTERVAL)
+ if time.time() > timeout:
+ console.print("[bold red] Timeout!")
+ return
+
+ result = thread.get()
+ result = result.result
+
+ if wait:
+ if result["status"] == "Deleted":
+ console.print("[bold green] Experiment(id = {} )
deleted".format(id))
+ else:
+ console.print("[bold red] Failed")
+ json_data = richJSON.from_data(result)
+ console.print(Panel(json_data, title="Experiment(id = {}
)".format(id)))
+
+ except ApiException as err:
+ if err.body is not None:
+ errbody = json.loads(err.body)
+ click.echo("[Api Error] {}".format(errbody["message"]))
+ else:
+ click.echo("[Api Error] {}".format(err))
diff --git
a/submarine-sdk/pysubmarine/submarine/client/api/experiment_client.py
b/submarine-sdk/pysubmarine/submarine/client/api/experiment_client.py
index 498e497..210b1df 100644
--- a/submarine-sdk/pysubmarine/submarine/client/api/experiment_client.py
+++ b/submarine-sdk/pysubmarine/submarine/client/api/experiment_client.py
@@ -33,7 +33,7 @@ def generate_host():
"""
submarine_server_dns_name =
str(os.environ.get("SUBMARINE_SERVER_DNS_NAME"))
submarine_server_port = str(os.environ.get("SUBMARINE_SERVER_PORT"))
- host = submarine_server_dns_name + ":" + submarine_server_port
+ host = "http://" + submarine_server_dns_name + ":" + submarine_server_port
return host
@@ -107,6 +107,15 @@ class ExperimentClient:
response = self.experiment_api.get_experiment(id=id)
return response.result
+ def get_experiment_async(self, id):
+ """
+ Get the experiment's detailed info by id (async)
+ :param id: submarine experiment id
+ :return: multiprocessing.pool.ApplyResult
+ """
+ thread = self.experiment_api.get_experiment(id=id, async_req=True)
+ return thread
+
def list_experiments(self, status=None):
"""
List all experiment for the user
@@ -116,6 +125,15 @@ class ExperimentClient:
response = self.experiment_api.list_experiments(status=status)
return response.result
+ def list_experiments_async(self, status=None):
+ """
+ List all experiment for the user (async)
+ :param status: Accepted, Created, Running, Succeeded, Deleted
+ :return: multiprocessing.pool.ApplyResult
+ """
+ thread = self.experiment_api.list_experiments(status=status,
async_req=True)
+ return thread
+
def delete_experiment(self, id):
"""
Delete the Submarine experiment
@@ -125,6 +143,15 @@ class ExperimentClient:
response = self.experiment_api.delete_experiment(id)
return response.result
+ def delete_experiment_async(self, id):
+ """
+ Delete the Submarine experiment (async)
+ :param id: Submarine experiment id
+ :return: The detailed info about deleted submarine experiment
+ """
+ thread = self.experiment_api.delete_experiment(id, async_req=True)
+ return thread
+
def get_log(self, id, onlyMaster=False):
"""
Get training logs of all pod of the experiment.
diff --git a/submarine-sdk/pysubmarine/tests/cli/test_experiment.py
b/submarine-sdk/pysubmarine/tests/cli/test_experiment.py
index 8d2ff44..e6203a9 100644
--- a/submarine-sdk/pysubmarine/tests/cli/test_experiment.py
+++ b/submarine-sdk/pysubmarine/tests/cli/test_experiment.py
@@ -13,29 +13,84 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import pytest
from click.testing import CliRunner
+import submarine
from submarine.cli import main
+from submarine.client.models.code_spec import CodeSpec
+from submarine.client.models.environment_spec import EnvironmentSpec
+from submarine.client.models.experiment_meta import ExperimentMeta
+from submarine.client.models.experiment_spec import ExperimentSpec
+from submarine.client.models.experiment_task_spec import ExperimentTaskSpec
+TEST_CONSOLE_WIDTH = 191
-def test_list_experiment():
- runner = CliRunner()
- result = runner.invoke(main.entry_point, ["list", "experiment"])
+
[email protected]
+def test_all_experiment_e2e():
+ """E2E Test for using submarine CLI to access submarine experiment
+ To run this test, you should first set
+ your submarine CLI config `port` to 8080 and `hostname` to localhost
+ i.e. please execute the commands in your terminal:
+ submarine config set connection.hostname localhost
+ submarine config set connection.port 8080
+ """
+ # set env to display full table
+ runner = CliRunner(env={"COLUMNS": str(TEST_CONSOLE_WIDTH)})
+ # check if cli config is correct for testing
+ result = runner.invoke(main.entry_point, ["config", "get",
"connection.port"])
assert result.exit_code == 0
- assert "list experiment!" in result.output
+ assert "connection.port={}".format(8080) in result.output
+ submarine_client = submarine.ExperimentClient(host="http://localhost:8080")
+ environment =
EnvironmentSpec(image="apache/submarine:tf-dist-mnist-test-1.0")
+ experiment_meta = ExperimentMeta(
+ name="mnist-dist",
+ namespace="default",
+ framework="Tensorflow",
+ cmd="python /var/tf_dist_mnist/dist_mnist.py --train_steps=100",
+ env_vars={"ENV1": "ENV1"},
+ )
-def test_get_experiment():
- mock_experiment_id = "0"
- runner = CliRunner()
- result = runner.invoke(main.entry_point, ["get", "experiment",
mock_experiment_id])
- assert result.exit_code == 0
- assert "get experiment! id={}".format(mock_experiment_id) in result.output
+ worker_spec = ExperimentTaskSpec(resources="cpu=1,memory=1024M",
replicas=1)
+ ps_spec = ExperimentTaskSpec(resources="cpu=1,memory=1024M", replicas=1)
+
+ code_spec = CodeSpec(sync_mode="git",
url="https://github.com/apache/submarine.git")
+
+ experiment_spec = ExperimentSpec(
+ meta=experiment_meta,
+ environment=environment,
+ code=code_spec,
+ spec={"Ps": ps_spec, "Worker": worker_spec},
+ )
+ experiment =
submarine_client.create_experiment(experiment_spec=experiment_spec)
+ experiment = submarine_client.get_experiment(experiment["experimentId"])
-def test_delete_experiment():
- mock_experiment_id = "0"
- runner = CliRunner()
- result = runner.invoke(main.entry_point, ["delete", "experiment",
mock_experiment_id])
+ # test list experiment
+ result = runner.invoke(main.entry_point, ["list", "experiment"])
assert result.exit_code == 0
- assert "delete experiment! id={}".format(mock_experiment_id) in
result.output
+ assert "List of Experiments" in result.output
+ assert experiment["spec"]["meta"]["name"] in result.output
+ assert experiment["experimentId"] in result.output
+ assert experiment["createdTime"] in result.output
+ if experiment["runningTime"] is not None:
+ assert experiment["runningTime"] in result.output
+ if experiment["status"] is not None:
+ assert experiment["status"] in result.output
+
+ # test get experiment
+ result = runner.invoke(main.entry_point, ["get", "experiment",
experiment["experimentId"]])
+ assert "Experiment(id = {} )".format(experiment["experimentId"]) in
result.output
+ assert experiment["spec"]["environment"]["image"] in result.output
+
+ # test delete experiment (blocking mode)
+ result = runner.invoke(
+ main.entry_point, ["delete", "experiment", experiment["experimentId"],
"--wait"]
+ )
+ assert "Experiment(id = {} ) deleted".format(experiment["experimentId"])
in result.output
+
+ # test get experiment fail after delete
+ result = runner.invoke(main.entry_point, ["get", "experiment",
experiment["experimentId"]])
+ assert "[Api Error] Not found experiment." in result.output
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]