This is an automated email from the ASF dual-hosted git repository.

pingsutw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new fb5c0f9  SUBMARINE-869. Add distributive pytorch example
fb5c0f9 is described below

commit fb5c0f9769accccf43684ad9b6e1378f02cb8bef
Author: featherchen <[email protected]>
AuthorDate: Fri Aug 27 13:13:18 2021 +0800

    SUBMARINE-869. Add distributive pytorch example
    
    ### What is this PR for?
    <!-- A few sentences describing the overall goals of the pull request's 
commits.
    First time? Check out the contributing guide - 
https://submarine.apache.org/contribution/contributions.html
    -->
    Add pytorch distributed training example with DistributedDataParallel 
strategy.
    
    ### What type of PR is it?
     Improvement
    
    ### Todos
    * [ ] - None
    
    ### What is the Jira issue?
    <!-- * Open an issue on Jira 
https://issues.apache.org/jira/browse/SUBMARINE/
    * Put link here, and add [SUBMARINE-*Jira number*] in PR title, eg. 
`SUBMARINE-23. PR title`
    -->
    https://issues.apache.org/jira/browse/SUBMARINE-869
    ### How should this be tested?
    <!--
    * First time? Setup Travis CI as described on 
https://submarine.apache.org/contribution/contributions.html#continuous-integration
    * Strongly recommended: add automated unit tests for any new or changed 
behavior
    * Outline any manual steps to test the PR here.
    -->
    See the vedio and README
    https://www.loom.com/share/46ae1188f5fd47fd8677eb01ab5fe899
    ### Screenshots (if appropriate)
    ![Screenshot from 2021-08-24 
11-46-07](https://user-images.githubusercontent.com/57944334/130553834-c618f20e-2f8b-4a3e-b993-58688ad3a6d6.png)
    ![Screenshot from 2021-08-24 
11-46-13](https://user-images.githubusercontent.com/57944334/130553856-b4e7dc1e-8236-498e-8644-67a322e37dbf.png)
    ![Screenshot from 2021-08-24 
11-46-34](https://user-images.githubusercontent.com/57944334/130553869-350a67e9-402e-4e8e-a81c-98726bd5c936.png)
    
    ### Questions:
    * Do the license files need updating? No
    * Are there breaking changes for older versions? No
    * Does this need new documentation? No
    
    Author: featherchen <[email protected]>
    Author: Kevin Su <[email protected]>
    
    Signed-off-by: Kevin <[email protected]>
    
    Closes #720 from featherchen/SUBMARINE-869 and squashes the following 
commits:
    
    04467656 [Kevin Su] Merge branch 'master' into SUBMARINE-869
    949452cb [featherchen] SUBMARINE-869. delete redundant comment
    e490379f [featherchen] SUBMARINE-869. README
    34dc3762 [featherchen] SUBMARINE-869. DDP example
    e2fccdbb [featherchen] add log_param and save_model
    a95cfe9f [featherchen] DDP example
    b137fbed [featherchen] distributive example
    57a7e19f [featherchen] SUBMARINE-869. add pytorch operator
---
 dev-support/examples/mnist-pytorch/DDP/Dockerfile  |  24 +++
 dev-support/examples/mnist-pytorch/DDP/build.sh    |  44 +++++
 .../mnist-pytorch/DDP/mnist_distributed.py         | 191 +++++++++++++++++++++
 dev-support/examples/mnist-pytorch/DDP/post.sh     |  42 +++++
 dev-support/examples/mnist-pytorch/DDP/readme.md   |  25 +++
 dev-support/examples/nn-pytorch/model.py           |   2 +-
 .../pysubmarine/submarine/models/utils.py          |   5 +-
 7 files changed, 328 insertions(+), 5 deletions(-)

diff --git a/dev-support/examples/mnist-pytorch/DDP/Dockerfile 
b/dev-support/examples/mnist-pytorch/DDP/Dockerfile
new file mode 100644
index 0000000..e5c217a
--- /dev/null
+++ b/dev-support/examples/mnist-pytorch/DDP/Dockerfile
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM python:3.7
+MAINTAINER Apache Software Foundation <[email protected]>
+
+ADD ./tmp/submarine-sdk /opt/
+RUN pip install pillow==8.2.0 && \
+    pip install /opt/pysubmarine[pytorch] && \
+    pip install tensorboardX
+
+ADD ./mnist_distributed.py /opt/
\ No newline at end of file
diff --git a/dev-support/examples/mnist-pytorch/DDP/build.sh 
b/dev-support/examples/mnist-pytorch/DDP/build.sh
new file mode 100755
index 0000000..84ece8f
--- /dev/null
+++ b/dev-support/examples/mnist-pytorch/DDP/build.sh
@@ -0,0 +1,44 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -euxo pipefail
+
+SUBMARINE_VERSION=0.6.0-SNAPSHOT
+SUBMARINE_IMAGE_NAME="ddp:${SUBMARINE_VERSION}"
+
+if [ -L ${BASH_SOURCE-$0} ]; then
+  PWD=$(dirname $(readlink "${BASH_SOURCE-$0}"))
+else
+  PWD=$(dirname ${BASH_SOURCE-$0})
+fi
+export CURRENT_PATH=$(cd "${PWD}">/dev/null; pwd)
+export SUBMARINE_HOME=${CURRENT_PATH}/../../../..
+
+if [ -d "${CURRENT_PATH}/tmp" ] # if old tmp folder is still there, delete it.
+then
+  rm -rf "${CURRENT_PATH}/tmp"
+fi
+
+mkdir -p "${CURRENT_PATH}/tmp"
+cp -r "${SUBMARINE_HOME}/submarine-sdk" "${CURRENT_PATH}/tmp"
+
+# build image
+cd ${CURRENT_PATH}
+echo "Start building the ${SUBMARINE_IMAGE_NAME} docker image ..."
+docker build -t ${SUBMARINE_IMAGE_NAME} .
+
+# clean temp file
+rm -rf "${CURRENT_PATH}/tmp"
diff --git a/dev-support/examples/mnist-pytorch/DDP/mnist_distributed.py 
b/dev-support/examples/mnist-pytorch/DDP/mnist_distributed.py
new file mode 100644
index 0000000..fd24a4e
--- /dev/null
+++ b/dev-support/examples/mnist-pytorch/DDP/mnist_distributed.py
@@ -0,0 +1,191 @@
+"""
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+"""
+
+from __future__ import print_function
+from submarine import ModelsClient
+
+import argparse
+import os
+
+from tensorboardX import SummaryWriter
+from torchvision import datasets, transforms
+import torch
+import torch.distributed as dist
+import torch.nn as nn
+import torch.nn.functional as F
+import torch.optim as optim
+
+WORLD_SIZE = int(os.environ.get('WORLD_SIZE', 1))
+rank = int(os.environ.get('RANK', 0))
+
+print('WORLD={} , RANK={}'.format(WORLD_SIZE,rank))
+class Net(nn.Module):
+    def __init__(self):
+        super(Net, self).__init__()
+        self.conv1 = nn.Conv2d(1, 20, 5, 1)
+        self.conv2 = nn.Conv2d(20, 50, 5, 1)
+        self.fc1 = nn.Linear(4*4*50, 500)
+        self.fc2 = nn.Linear(500, 10)
+
+    def forward(self, x):
+        x = F.relu(self.conv1(x))
+        x = F.max_pool2d(x, 2, 2)
+        x = F.relu(self.conv2(x))
+        x = F.max_pool2d(x, 2, 2)
+        x = x.view(-1, 4*4*50)
+        x = F.relu(self.fc1(x))
+        x = self.fc2(x)
+        return F.log_softmax(x, dim=1)
+    
+def train(args, model, device, train_loader, optimizer, epoch, writer, 
periscope):
+    model.train()
+    for batch_idx, (data, target) in enumerate(train_loader):
+        data, target = data.to(device), target.to(device)
+        optimizer.zero_grad()
+        output = model(data)
+        loss = F.nll_loss(output, target)
+        loss.backward()
+        optimizer.step()
+        if batch_idx % args.log_interval == 0:
+            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}'.format(
+                epoch, batch_idx * len(data), len(train_loader.dataset),
+                100. * batch_idx / len(train_loader), loss.item()))
+            niter = epoch * len(train_loader) + batch_idx
+            writer.add_scalar('loss', loss.item(), niter)
+            periscope.log_metric('loss', loss.item(), niter)
+
+def test(args, model, device, test_loader, writer, epoch, periscope):
+    model.eval()
+    test_loss = 0
+    correct = 0
+    with torch.no_grad():
+        for data, target in test_loader:
+            data, target = data.to(device), target.to(device)
+            output = model(data)
+            test_loss += F.nll_loss(output, target, reduction='sum').item() # 
sum up batch loss
+            pred = output.max(1, keepdim=True)[1] # get the index of the max 
log-probability
+            correct += pred.eq(target.view_as(pred)).sum().item()
+
+    test_loss /= len(test_loader.dataset)
+    print('\naccuracy={:.4f}\n'.format(float(correct) / 
len(test_loader.dataset)))
+    writer.add_scalar('accuracy', float(correct) / len(test_loader.dataset), 
epoch)
+    periscope.log_metric('accuracy', float(correct) / 
len(test_loader.dataset), epoch)
+
+def should_distribute():
+    return dist.is_available() and WORLD_SIZE > 1
+
+
+def is_distributed():
+    return dist.is_available() and dist.is_initialized()
+
+
+if __name__ == '__main__':
+    # Training settings
+    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
+    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
+                        help='input batch size for training (default: 64)')
+    parser.add_argument('--test-batch-size', type=int, default=1000, 
metavar='N',
+                        help='input batch size for testing (default: 1000)')
+    parser.add_argument('--epochs', type=int, default=5, metavar='N',
+                        help='number of epochs to train (default: 5)')
+    parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
+                        help='learning rate (default: 0.01)')
+    parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
+                        help='SGD momentum (default: 0.5)')
+    parser.add_argument('--no-cuda', action='store_true', default=False,
+                        help='disables CUDA training')
+    parser.add_argument('--seed', type=int, default=1, metavar='S',
+                        help='random seed (default: 1)')
+    parser.add_argument('--log-interval', type=int, default=10, metavar='N',
+                        help='how many batches to wait before logging training 
status')
+    parser.add_argument('--save-model', action='store_true', default=False,
+                        help='For Saving the current Model')
+    parser.add_argument('--dir', default='logs', metavar='L',
+                        help='directory where summary logs are stored')
+    if dist.is_available():
+        parser.add_argument('--backend', type=str, help='Distributed backend',
+                            choices=[dist.Backend.GLOO, dist.Backend.NCCL, 
dist.Backend.MPI],
+                            default=dist.Backend.GLOO)
+    args = parser.parse_args()
+    use_cuda = not args.no_cuda and torch.cuda.is_available()
+    if use_cuda:
+        print('Using CUDA')
+    else :
+        print('Not Using CUDA')
+
+    writer = SummaryWriter(args.dir)
+
+    torch.manual_seed(args.seed)
+
+    device = torch.device("cuda" if use_cuda else "cpu")
+
+    if should_distribute():
+        print('Using distributed PyTorch with {} backend'.format(args.backend))
+        dist.init_process_group(
+            backend=args.backend,
+            world_size=WORLD_SIZE,
+            rank=rank)
+
+    kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
+
+    train_dataset = datasets.FashionMNIST('../data', train=True, download=True,
+                       transform=transforms.Compose([
+                           transforms.ToTensor(),
+                           transforms.Normalize((0.1307,), (0.3081,))
+                       ]))
+    train_sampler = torch.utils.data.distributed.DistributedSampler(
+        train_dataset,
+        num_replicas = WORLD_SIZE,
+        rank=rank
+    )
+
+    train_loader = torch.utils.data.DataLoader(
+        dataset = train_dataset,
+        batch_size = args.batch_size, 
+        shuffle = False,
+        sampler = train_sampler,
+        **kwargs)
+
+    test_loader = torch.utils.data.DataLoader(
+        datasets.FashionMNIST('../data', train=False, 
transform=transforms.Compose([
+                           transforms.ToTensor(),
+                           transforms.Normalize((0.1307,), (0.3081,))
+                       ])),
+        batch_size=args.test_batch_size, shuffle=False, **kwargs)
+
+    model = Net().to(device)
+
+    if is_distributed():
+        Distributor = nn.parallel.DistributedDataParallel if use_cuda \
+            else nn.parallel.DistributedDataParallelCPU
+        model = Distributor(model)
+
+    optimizer = optim.SGD(model.parameters(), lr=args.lr, 
momentum=args.momentum)
+    periscope = ModelsClient()
+    with periscope.start() as run:
+        periscope.log_param("learning_rate", args.lr)
+        periscope.log_param("batch_size", args.batch_size)
+        for epoch in range(1, args.epochs + 1):
+            train(args, model, device, train_loader, optimizer, epoch, writer, 
periscope)
+            test(args, model, device, test_loader, writer, epoch, periscope)
+    if (args.save_model):
+        torch.save(model.state_dict(),"mnist_cnn.pt")
+
+"""
+Reference:
+https://github.com/kubeflow/pytorch-operator/blob/master/examples/mnist/mnist.py
+"""
\ No newline at end of file
diff --git a/dev-support/examples/mnist-pytorch/DDP/post.sh 
b/dev-support/examples/mnist-pytorch/DDP/post.sh
new file mode 100755
index 0000000..ec2d0cf
--- /dev/null
+++ b/dev-support/examples/mnist-pytorch/DDP/post.sh
@@ -0,0 +1,42 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+curl -X POST -H "Content-Type: application/json" -d '
+{
+  "meta": {
+    "name": "ddp-example",
+    "namespace": "default",
+    "framework": "PyTorch",
+    "cmd": "python /opt/mnist_distributed.py",
+    "envVars": {
+      "ENV_1": "ENV1"
+    }
+  },
+  "environment": {
+    "image": "ddp:0.6.0-SNAPSHOT"
+  },
+  "spec": {
+    "Master": {
+      "replicas": 1,
+      "resources": "cpu=1,memory=128M"
+    },
+    "Worker": {
+      "replicas": 3,
+      "resources": "cpu=1,memory=128M"
+    }
+  }
+}
+' http://127.0.0.1:32080/api/v1/experiment
\ No newline at end of file
diff --git a/dev-support/examples/mnist-pytorch/DDP/readme.md 
b/dev-support/examples/mnist-pytorch/DDP/readme.md
new file mode 100644
index 0000000..ffa145c
--- /dev/null
+++ b/dev-support/examples/mnist-pytorch/DDP/readme.md
@@ -0,0 +1,25 @@
+# Pytorch DistributedDataParallel(DDP) Example
+
+## Usage
+
+This is an easy mnist example of how to train a distributed pytorch model 
using DistributedDataParallel(DDP) method and track the metric and paramater in 
submarine-sdk.
+
+## How to execute
+
+0. Set up (for a single terminal, only need to do this one time)
+
+```bash
+eval $(minikube -p minikube docker-env)
+```
+
+1. Build the docker image
+
+```bash
+./dev-support/examples/mnist-pytorch/DDP/build.sh
+```
+
+2. Submit a post request
+
+```bash
+./dev-support/examples/mnist-pytorch/DDP/post.sh
+```
diff --git a/dev-support/examples/nn-pytorch/model.py 
b/dev-support/examples/nn-pytorch/model.py
index 6080c4a..10ef94b 100644
--- a/dev-support/examples/nn-pytorch/model.py
+++ b/dev-support/examples/nn-pytorch/model.py
@@ -32,4 +32,4 @@ class LinearNNModel(torch.nn.Module):
 if __name__ == "__main__":
     client = ModelsClient()
     net = LinearNNModel()
-    client.save_model(model_type = "pytorch", model = net, 
artifact_path="pytorch-nn-model", registered_model_name="simple-nn-model")
\ No newline at end of file
+    client.save_model(model_type = "pytorch", model = net, 
artifact_path="pytorch-nn-model", registered_model_name="simple-nn-model")
diff --git a/submarine-sdk/pysubmarine/submarine/models/utils.py 
b/submarine-sdk/pysubmarine/submarine/models/utils.py
index 20bb277..d8281a0 100644
--- a/submarine-sdk/pysubmarine/submarine/models/utils.py
+++ b/submarine-sdk/pysubmarine/submarine/models/utils.py
@@ -68,10 +68,7 @@ def get_worker_index():
     # Get PyTorch worker index
     elif env.get_env(_RANK) is not None:
         rank = env.get_env(_RANK)
-        if rank == "0":
-            worker_index = "master-0"
-        else:
-            worker_index = "worker-" + rank
+        worker_index = "worker-" + rank
     # Set worker index to "worker-0" When running local training
     else:
         worker_index = "worker-0"

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to