This is an automated email from the ASF dual-hosted git repository.

haibin pushed a commit to branch v1.6.x
in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git


The following commit(s) were added to refs/heads/v1.6.x by this push:
     new c675520  [BUGFIX] Fix race condition in kvstore.pushpull (#17007) 
(#17052)
c675520 is described below

commit c6755208f4f78d9f4ea095ec2ed8e067c8db1ef1
Author: Przemyslaw Tredak <ptre...@nvidia.com>
AuthorDate: Wed Dec 11 21:11:57 2019 -0800

    [BUGFIX] Fix race condition in kvstore.pushpull (#17007) (#17052)
    
    * add back gluon test
    
    * fix typo
    
    * change back gpu ctx
    
    * also handle the case there some are pull and some are pushpull
    
    * fix typo
---
 src/kvstore/kvstore_dist_server.h         | 35 +++++++++++++++++++++----------
 tests/nightly/dist_device_sync_kvstore.py | 35 +++++++++++++++++--------------
 2 files changed, 43 insertions(+), 27 deletions(-)

diff --git a/src/kvstore/kvstore_dist_server.h 
b/src/kvstore/kvstore_dist_server.h
index 65ded79..1dc222c 100644
--- a/src/kvstore/kvstore_dist_server.h
+++ b/src/kvstore/kvstore_dist_server.h
@@ -364,21 +364,34 @@ class KVStoreDistServer {
       if (log_verbose_)  {
         LOG(INFO) << "sent response to " << update_buf->request.size() << " 
workers";
       }
+      /**
+       * Request can be for either push, pull or pushpull
+       * If pull flag is set, respond immediately with the updated values
+       * Otherwise, only send the notification
+       */
+      bool has_pull = false;
       for (const auto& req : update_buf->request) {
-        /**
-         * Request can be for either push, pull or pushpull
-         * If pull flag is set, respond immediately with the updated values
-         * Otherwise, only send the notification
-         */
-        if (req.pull) {
-          DefaultStorageResponse(type, key, req, req_data, server);
-        } else {
+        has_pull = has_pull || req.pull;
+      }
+      if (has_pull) {
+        // if there is a pull request, perform WaitToRead() once before 
DefaultStorageResponse
+        if (has_multi_precision_copy(type)) CopyFromTo(stored, store_[key]);
+        stored.WaitToRead();
+        for (const auto& req : update_buf->request) {
+          if (req.pull) {
+            DefaultStorageResponse(type, key, req, req_data, server);
+          }
+        }
+        update_buf->request.clear();
+      } else {
+        // otherwise, send response directly
+        for (const auto& req : update_buf->request) {
           server->Response(req);
         }
+        update_buf->request.clear();
+        if (has_multi_precision_copy(type)) CopyFromTo(stored, store_[key]);
+        stored.WaitToRead();
       }
-      update_buf->request.clear();
-      if (has_multi_precision_copy(type)) CopyFromTo(stored, store_[key]);
-      stored.WaitToRead();
     } else {
       update_buf->merged.WaitToRead();
     }
diff --git a/tests/nightly/dist_device_sync_kvstore.py 
b/tests/nightly/dist_device_sync_kvstore.py
index dc2c7bc..f3fe737 100644
--- a/tests/nightly/dist_device_sync_kvstore.py
+++ b/tests/nightly/dist_device_sync_kvstore.py
@@ -44,7 +44,10 @@ kv = mx.kv.create('dist_device_sync')
 def init_kv():
     # init kv dns keys
     kv.init(keys, [mx.nd.ones(shape)] * len(keys))
+    kv.init('9', mx.nd.ones(shape))
+    kv.init('10', mx.nd.ones(shape))
     kv.init('99', mx.nd.ones(big_shape))
+    kv.init('100', mx.nd.ones(big_shape))
     # worker info
     my_rank = kv.rank
     nworker = kv.num_workers
@@ -55,33 +58,30 @@ def init_kv():
 def test_sync_push_pull():
     kv, my_rank, nworker = init_kv()
     num_gpus = 2
-    def check_default_keys(kv, my_rank, nworker, nrepeat=3, offset=0, 
use_pushpull=False):
+    def check_default_keys(kv, my_rank, nworker, nrepeat=3):
         # checks pull after push in loop, because behavior during
         # consecutive pushes doesn't offer any guarantees
-        for i in range(offset, nrepeat):
+        for i in range(nrepeat):
             scale = my_rank + 1
             num = (nworker + 1) * nworker * rate * num_gpus / 2 * (i + 1) + 1
 
             arr = [mx.nd.ones(shape, ctx=mx.gpu(j)) * scale for j in 
range(num_gpus)]
             val = mx.nd.zeros(shape)
-            if use_pushpull:
-                kv.pushpull('3', arr, out=val)
-            else:
-                kv.push('3', arr)
-                kv.pull('3', out=val)
+            kv.push('9', arr)
+            kv.pull('9', out=val)
+            check_diff_to_scalar(val, num)
+            kv.pushpull('10', arr, out=val)
             check_diff_to_scalar(val, num)
 
             big_arr = [mx.nd.ones(big_shape, ctx=mx.gpu(j)) * scale for j in 
range(num_gpus)]
             big_val = mx.nd.zeros(big_shape)
-            if use_pushpull:
-                kv.pushpull('99', big_arr, out=big_val)
-            else:
-                kv.push('99', big_arr)
-                kv.pull('99', out=big_val)
+            kv.push('99', big_arr)
+            kv.pull('99', out=big_val)
+            check_diff_to_scalar(big_val, num)
+            kv.pushpull('100', big_arr, out=big_val)
             check_diff_to_scalar(big_val, num)
 
-    check_default_keys(kv, my_rank, nworker, nrepeat=3, offset=0, 
use_pushpull=False)
-    check_default_keys(kv, my_rank, nworker, nrepeat=3, offset=3, 
use_pushpull=True)
+    check_default_keys(kv, my_rank, nworker, nrepeat=3)
     print('worker ' + str(my_rank) + ' is done')
 
 def test_sync_init():
@@ -106,10 +106,12 @@ def test_gluon_trainer_type():
         x = params.get('x', shape=(10,1), lr_mult=1.0)
         params.initialize(ctx=[mx.cpu(0), mx.cpu(1)], init='zeros')
         try:
-            trainer = mx.gluon.Trainer(params, 'sgd', {'learning_rate': 0.1}, 
kvstore=kv, update_on_kvstore=update_on_kv)
+            trainer = mx.gluon.Trainer(params, 'sgd', {'learning_rate': 0.1},
+                                       kvstore=kv, 
update_on_kvstore=update_on_kv)
             trainer._init_kvstore()
             assert trainer._kv_initialized
-            assert trainer._update_on_kvstore is True
+            if update_on_kv is not None:
+                assert trainer._update_on_kvstore is update_on_kv
         except ValueError:
             assert update_on_kv is False
 
@@ -122,3 +124,4 @@ def test_gluon_trainer_type():
 if __name__ == "__main__":
     test_sync_init()
     test_sync_push_pull()
+    test_gluon_trainer_type()

Reply via email to