D1568: lfs: using workers in lfs prefetch
wlis added a comment. I finally got around to testing this properly and I can reproduce the issue. I looked into the code a bit and it is possible that we create keepalive connections before forking and we are illegally multiplexing same connection. The quick fix on our side is to not use workers on upload action, and it fixes the issue right away. Proper fix would be to fix the https handler, but it doesn't look like an easy one. I don't think I'll get to that any time soon. I'll try to submit a patch to only use workers for download to mitigate the issue. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1568 To: wlis, #hg-reviewers, quark, mharbison72 Cc: mharbison72, quark, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1568: lfs: using workers in lfs prefetch
wlis added a comment. @mharbison72 Thank you for commenting with this issue. We didn't roll this to many people yet and didn't see the issue. I will try to test the scenario with upload of many large files and I'll comment back here soon. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1568 To: wlis, #hg-reviewers, quark, mharbison72 Cc: mharbison72, quark, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1718: worker: handle interrupt on windows
This revision was automatically updated to reflect the committed changes. Closed by commit rHG44fd4cfc6c0a: worker: handle interrupt on windows (authored by wlis, committed by ). REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D1718?vs=4533&id=4540 REVISION DETAIL https://phab.mercurial-scm.org/D1718 AFFECTED FILES mercurial/worker.py CHANGE DETAILS diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -282,7 +282,7 @@ if t.exception is not None: raise t.exception threads.remove(t) -except Exception: # re-raises +except (Exception, KeyboardInterrupt): # re-raises trykillworkers() raise while not resultqueue.empty(): To: wlis, #hg-reviewers, krbullock Cc: krbullock, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1718: worker: handle interrupt on windows
wlis created this revision. Herald added a subscriber: mercurial-devel. Herald added a reviewer: hg-reviewers. REVISION SUMMARY After applying suggestions from https://phab.mercurial-scm.org/D1564 to catch all exceptions in the same way I actually broke the handling of KeyboardInterrupt on windows. The reason is that KeyboardInterrupt doesn't dervie from Exception, but BaseException: https://docs.python.org/2/library/exceptions.html starting from python 2.5 TEST PLAN Run hg on windows and ctrl-c during a large update. No random exceptions from threads surface in the shell. Previously we'd nearly always get stack traces from some of threads Run tests ./run-tests.py [...] Failed test-convert-svn-encoding.t: output changed Ran 622 tests, 41 skipped, 1 failed. python hash seed: 2962682116 The test failing seems to have nothing to do with the change and fails on base revision as well REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1718 AFFECTED FILES mercurial/worker.py CHANGE DETAILS diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -282,7 +282,7 @@ if t.exception is not None: raise t.exception threads.remove(t) -except Exception: # re-raises +except (Exception, KeyboardInterrupt): # re-raises trykillworkers() raise while not resultqueue.empty(): To: wlis, #hg-reviewers Cc: mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1564: worker: make windows workers daemons
wlis added inline comments. INLINE COMMENTS > worker.py:285 > +threads.remove(t) > +except Exception: # re-raises > +trykillworkers() I forgot to update this diff with KeyboardInterrupt again which doesn't derive from Exception, but BaseException instead. I will try to get the 1 line patch up asap. Currently the ctrl-c wouldn't be handled gracefully on windows REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1564 To: wlis, #hg-reviewers, ikostia, durin42 Cc: durin42, durham, ikostia, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1568: lfs: using workers in lfs prefetch
This revision was automatically updated to reflect the committed changes. Closed by commit rHG2b7c0cba308f: lfs: using workers in lfs prefetch (authored by wlis, committed by ). REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D1568?vs=4374&id=4489 REVISION DETAIL https://phab.mercurial-scm.org/D1568 AFFECTED FILES hgext/lfs/blobstore.py tests/test-lfs-test-server.t CHANGE DETAILS diff --git a/tests/test-lfs-test-server.t b/tests/test-lfs-test-server.t --- a/tests/test-lfs-test-server.t +++ b/tests/test-lfs-test-server.t @@ -43,6 +43,7 @@ pushing to ../repo2 searching for changes lfs: uploading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes) + lfs: processed: 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b 1 changesets found uncompressed size of bundle content: * (changelog) (glob) @@ -60,6 +61,7 @@ resolving manifests getting a lfs: downloading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes) + lfs: processed: 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b 1 files updated, 0 files merged, 0 files removed, 0 files unresolved When the server has some blobs already @@ -73,7 +75,9 @@ searching for changes lfs: need to transfer 2 objects (39 bytes) lfs: uploading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes) + lfs: processed: 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 lfs: uploading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes) + lfs: processed: d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 1 changesets found uncompressed size of bundle content: adding changesets @@ -88,8 +92,10 @@ getting b getting c lfs: downloading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes) + lfs: processed: d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 getting d lfs: downloading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes) + lfs: processed: 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 3 files updated, 0 files merged, 0 files removed, 0 files unresolved Check error message when the remote missed a blob: diff --git a/hgext/lfs/blobstore.py b/hgext/lfs/blobstore.py --- a/hgext/lfs/blobstore.py +++ b/hgext/lfs/blobstore.py @@ -19,6 +19,7 @@ url as urlmod, util, vfs as vfsmod, +worker, ) from ..largefiles import lfutil @@ -205,7 +206,7 @@ return filteredobjects -def _basictransfer(self, obj, action, localstore, progress=None): +def _basictransfer(self, obj, action, localstore): """Download or upload a single object using basic transfer protocol obj: dict, an object description returned by batch API @@ -223,7 +224,7 @@ request = util.urlreq.request(href) if action == 'upload': # If uploading blobs, read data from local blobstore. -request.data = filewithprogress(localstore.vfs(oid), progress) +request.data = filewithprogress(localstore.vfs(oid), None) request.get_method = lambda: 'PUT' for k, v in headers: @@ -236,8 +237,6 @@ data = req.read(1048576) if not data: break -if action == 'download' and progress: -progress(len(data)) response += data except util.urlerr.httperror as ex: raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)') @@ -252,45 +251,51 @@ raise error.ProgrammingError('invalid Git-LFS action: %s' % action) response = self._batchrequest(pointers, action) -prunningsize = [0] objects = self._extractobjects(response, pointers, action) total = sum(x.get('size', 0) for x in objects) +sizes = {} +for obj in objects: +sizes[obj.get('oid')] = obj.get('size', 0) topic = {'upload': _('lfs uploading'), 'download': _('lfs downloading')}[action] if self.ui.verbose and len(objects) > 1: self.ui.write(_('lfs: need to transfer %d objects (%s)\n') % (len(objects), util.bytecount(total))) self.ui.progress(topic, 0, total=total) -def progress(size): -# advance progress bar by "size" bytes -prunningsize[0] += size -self.ui.progress(topic, prunningsize[0], total=total) -for obj in sorted(objects, key=lambda o: o.get('oid')): -objsize = obj.get('size', 0) +def transfer(chunk): +for obj in chunk: +objsize = obj.get('size', 0) +if self.ui.verbose: +if action == 'download': +msg = _('lfs: downloading %s (%s)\n') +
D1460: workers: add config to enable/diable workers
This revision was automatically updated to reflect the committed changes. Closed by commit rHG9b7ee7b78856: workers: add config to enable/diable workers (authored by wlis, committed by ). REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D1460?vs=4023&id=4487 REVISION DETAIL https://phab.mercurial-scm.org/D1460 AFFECTED FILES mercurial/configitems.py mercurial/help/config.txt mercurial/worker.py CHANGE DETAILS diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -82,7 +82,8 @@ args - arguments to split into chunks, to pass to individual workers ''' -if worthwhile(ui, costperarg, len(args)): +enabled = ui.configbool('worker', 'enabled', True) +if enabled and worthwhile(ui, costperarg, len(args)): return _platformworker(ui, func, staticargs, args) return func(*staticargs + (args,)) diff --git a/mercurial/help/config.txt b/mercurial/help/config.txt --- a/mercurial/help/config.txt +++ b/mercurial/help/config.txt @@ -2563,6 +2563,10 @@ directory updates in parallel on Unix-like systems, which greatly helps performance. +``enabled`` +Whether to enable workers code to be used. +(default: true) + ``numcpus`` Number of CPUs to use for parallel operations. A zero or negative value is treated as ``use the default``. diff --git a/mercurial/configitems.py b/mercurial/configitems.py --- a/mercurial/configitems.py +++ b/mercurial/configitems.py @@ -1253,6 +1253,9 @@ coreconfigitem('worker', 'backgroundclosethreadcount', default=4, ) +coreconfigitem('worker', 'enabled', +default=True, +) coreconfigitem('worker', 'numcpus', default=None, ) To: wlis, #hg-reviewers, lothiraldan, ikostia, durin42 Cc: lothiraldan, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1564: worker: make windows workers daemons
This revision was automatically updated to reflect the committed changes. Closed by commit rHG487e7e6e89a0: worker: make windows workers daemons (authored by wlis, committed by ). REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D1564?vs=4024&id=4488 REVISION DETAIL https://phab.mercurial-scm.org/D1564 AFFECTED FILES mercurial/worker.py CHANGE DETAILS diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -12,6 +12,7 @@ import signal import sys import threading +import time from .i18n import _ from . import ( @@ -216,6 +217,7 @@ self._func = func self._staticargs = staticargs self._interrupted = False +self.daemon = True self.exception = None def interrupt(self): @@ -242,16 +244,22 @@ raise threads = [] -def killworkers(): +def trykillworkers(): +# Allow up to 1 second to clean worker threads nicely +cleanupend = time.time() + 1 for t in threads: t.interrupt() for t in threads: -# try to let the threads handle interruption, but don't wait -# indefintely. the thread could be in infinite loop, handling -# a very long task or in a deadlock situation -t.join(5) +remainingtime = cleanupend - time.time() +t.join(remainingtime) if t.is_alive(): -raise error.Abort(_('failed to join worker thread')) +# pass over the workers joining failure. it is more +# important to surface the inital exception than the +# fact that one of workers may be processing a large +# task and does not get to handle the interruption. +ui.warn(_("failed to kill worker threads while " + "handling an exception\n")) +return workers = _numworkers(ui) resultqueue = util.queue() @@ -264,25 +272,19 @@ t = Worker(taskqueue, resultqueue, func, staticargs) threads.append(t) t.start() - -while len(threads) > 0: -while not resultqueue.empty(): -yield resultqueue.get() -threads[0].join(0.05) -finishedthreads = [_t for _t in threads if not _t.is_alive()] -for t in finishedthreads: -if t.exception is not None: -try: -killworkers() -except Exception: -# pass over the workers joining failure. it is more -# important to surface the inital exception than the -# fact that one of workers may be processing a large -# task and does not get to handle the interruption. -ui.warn(_("failed to kill worker threads while handling " - "an exception")) -raise t.exception -threads.remove(t) +try: +while len(threads) > 0: +while not resultqueue.empty(): +yield resultqueue.get() +threads[0].join(0.05) +finishedthreads = [_t for _t in threads if not _t.is_alive()] +for t in finishedthreads: +if t.exception is not None: +raise t.exception +threads.remove(t) +except Exception: # re-raises +trykillworkers() +raise while not resultqueue.empty(): yield resultqueue.get() To: wlis, #hg-reviewers, ikostia, durin42 Cc: durin42, durham, ikostia, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1459: workers: handling exceptions in windows workers
This revision was automatically updated to reflect the committed changes. Closed by commit rHG71427ff1dff8: workers: handling exceptions in windows workers (authored by wlis, committed by ). REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D1459?vs=3666&id=4486 REVISION DETAIL https://phab.mercurial-scm.org/D1459 AFFECTED FILES mercurial/worker.py CHANGE DETAILS diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -214,18 +214,45 @@ self._resultqueue = resultqueue self._func = func self._staticargs = staticargs +self._interrupted = False +self.exception = None + +def interrupt(self): +self._interrupted = True def run(self): -while not self._taskqueue.empty(): -try: -args = self._taskqueue.get_nowait() -for res in self._func(*self._staticargs + (args,)): -self._resultqueue.put(res) -except util.empty: -break +try: +while not self._taskqueue.empty(): +try: +args = self._taskqueue.get_nowait() +for res in self._func(*self._staticargs + (args,)): +self._resultqueue.put(res) +# threading doesn't provide a native way to +# interrupt execution. handle it manually at every +# iteration. +if self._interrupted: +return +except util.empty: +break +except Exception as e: +# store the exception such that the main thread can resurface +# it as if the func was running without workers. +self.exception = e +raise + +threads = [] +def killworkers(): +for t in threads: +t.interrupt() +for t in threads: +# try to let the threads handle interruption, but don't wait +# indefintely. the thread could be in infinite loop, handling +# a very long task or in a deadlock situation +t.join(5) +if t.is_alive(): +raise error.Abort(_('failed to join worker thread')) workers = _numworkers(ui) -threads = [] resultqueue = util.queue() taskqueue = util.queue() # partition work to more pieces than workers to minimize the chance @@ -236,12 +263,24 @@ t = Worker(taskqueue, resultqueue, func, staticargs) threads.append(t) t.start() -while any(t.is_alive() for t in threads): + +while len(threads) > 0: while not resultqueue.empty(): yield resultqueue.get() -t = threads[0] -t.join(0.05) -if not t.is_alive(): +threads[0].join(0.05) +finishedthreads = [_t for _t in threads if not _t.is_alive()] +for t in finishedthreads: +if t.exception is not None: +try: +killworkers() +except Exception: +# pass over the workers joining failure. it is more +# important to surface the inital exception than the +# fact that one of workers may be processing a large +# task and does not get to handle the interruption. +ui.warn(_("failed to kill worker threads while handling " + "an exception")) +raise t.exception threads.remove(t) while not resultqueue.empty(): yield resultqueue.get() To: wlis, #hg-reviewers, durin42 Cc: mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1458: workers: implemented worker on windows
This revision was automatically updated to reflect the committed changes. Closed by commit rHG02b36e860e0b: workers: implemented worker on windows (authored by wlis, committed by ). REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D1458?vs=3665&id=4485 REVISION DETAIL https://phab.mercurial-scm.org/D1458 AFFECTED FILES mercurial/worker.py CHANGE DETAILS diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -11,6 +11,7 @@ import os import signal import sys +import threading from .i18n import _ from . import ( @@ -53,7 +54,7 @@ raise error.Abort(_('number of cpus must be an integer')) return min(max(countcpus(), 4), 32) -if pycompat.isposix: +if pycompat.isposix or pycompat.iswindows: _startupcost = 0.01 else: _startupcost = 1e30 @@ -203,7 +204,51 @@ elif os.WIFSIGNALED(code): return -os.WTERMSIG(code) -if not pycompat.iswindows: +def _windowsworker(ui, func, staticargs, args): +class Worker(threading.Thread): +def __init__(self, taskqueue, resultqueue, func, staticargs, + group=None, target=None, name=None, verbose=None): +threading.Thread.__init__(self, group=group, target=target, + name=name, verbose=verbose) +self._taskqueue = taskqueue +self._resultqueue = resultqueue +self._func = func +self._staticargs = staticargs + +def run(self): +while not self._taskqueue.empty(): +try: +args = self._taskqueue.get_nowait() +for res in self._func(*self._staticargs + (args,)): +self._resultqueue.put(res) +except util.empty: +break + +workers = _numworkers(ui) +threads = [] +resultqueue = util.queue() +taskqueue = util.queue() +# partition work to more pieces than workers to minimize the chance +# of uneven distribution of large tasks between the workers +for pargs in partition(args, workers * 20): +taskqueue.put(pargs) +for _i in range(workers): +t = Worker(taskqueue, resultqueue, func, staticargs) +threads.append(t) +t.start() +while any(t.is_alive() for t in threads): +while not resultqueue.empty(): +yield resultqueue.get() +t = threads[0] +t.join(0.05) +if not t.is_alive(): +threads.remove(t) +while not resultqueue.empty(): +yield resultqueue.get() + +if pycompat.iswindows: +_platformworker = _windowsworker +else: _platformworker = _posixworker _exitstatus = _posixexitstatus To: wlis, #hg-reviewers Cc: durin42, indygreg, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1457: workers: don't use backgroundfilecloser in threads
This revision was automatically updated to reflect the committed changes. Closed by commit rHG60f2a215faa7: workers: don't use backgroundfilecloser in threads (authored by wlis, committed by ). REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D1457?vs=4386&id=4484 REVISION DETAIL https://phab.mercurial-scm.org/D1457 AFFECTED FILES mercurial/vfs.py CHANGE DETAILS diff --git a/mercurial/vfs.py b/mercurial/vfs.py --- a/mercurial/vfs.py +++ b/mercurial/vfs.py @@ -277,8 +277,12 @@ to ``__call__``/``open`` to result in the file possibly being closed asynchronously, on a background thread. """ -# This is an arbitrary restriction and could be changed if we ever -# have a use case. +# Sharing backgroundfilecloser between threads is complex and using +# multiple instances puts us at risk of running out of file descriptors +# only allow to use backgroundfilecloser when in main thread. +if not isinstance(threading.currentThread(), threading._MainThread): +yield +return vfs = getattr(self, 'vfs', self) if getattr(vfs, '_backgroundfilecloser', None): raise error.Abort( @@ -413,7 +417,8 @@ ' valid for checkambig=True') % mode) fp = checkambigatclosing(fp) -if backgroundclose: +if (backgroundclose and +isinstance(threading.currentThread(), threading._MainThread)): if not self._backgroundfilecloser: raise error.Abort(_('backgroundclose can only be used when a ' 'backgroundclosing context manager is active') To: wlis, #hg-reviewers, indygreg, krbullock Cc: krbullock, durin42, indygreg, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1457: workers: don't use backgroundfilecloser in threads
wlis updated this revision to Diff 4386. REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D1457?vs=4022&id=4386 REVISION DETAIL https://phab.mercurial-scm.org/D1457 AFFECTED FILES mercurial/vfs.py CHANGE DETAILS diff --git a/mercurial/vfs.py b/mercurial/vfs.py --- a/mercurial/vfs.py +++ b/mercurial/vfs.py @@ -277,8 +277,12 @@ to ``__call__``/``open`` to result in the file possibly being closed asynchronously, on a background thread. """ -# This is an arbitrary restriction and could be changed if we ever -# have a use case. +# Sharing backgroundfilecloser between threads is complex and using +# multiple instances puts us at risk of running out of file descriptors +# only allow to use backgroundfilecloser when in main thread. +if not isinstance(threading.currentThread(), threading._MainThread): +yield +return vfs = getattr(self, 'vfs', self) if getattr(vfs, '_backgroundfilecloser', None): raise error.Abort( @@ -413,7 +417,8 @@ ' valid for checkambig=True') % mode) fp = checkambigatclosing(fp) -if backgroundclose: +if (backgroundclose and +isinstance(threading.currentThread(), threading._MainThread)): if not self._backgroundfilecloser: raise error.Abort(_('backgroundclose can only be used when a ' 'backgroundclosing context manager is active') To: wlis, #hg-reviewers, indygreg, krbullock Cc: krbullock, durin42, indygreg, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1458: workers: implemented worker on windows
wlis added a comment. @durin42 yes, I tested without remotefilelog (at least I believe it was not being used at that time). I cloned a repo with --config extensions.remotefilelog=! and then put appropriate section in .hg/hgrc Ran updates between far revisions and verified that threads get started during update. Hg didn't complain about anything and repo stayed healthy. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1458 To: wlis, #hg-reviewers Cc: durin42, indygreg, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1568: lfs: using workers in lfs prefetch
wlis added a comment. @mharbison72 I am not sure if these tests are able to satisfy conditions to actually multithread. But you are right it there is an issue we can force 1 worker. The workers on posix are implemented by forking and the only way of communication is through pipes created by worker.py code. Once forked they only communicate every some # of tasks (file fetches in this case) has been finished by the worker (I think # ~ 100 but not sure). We would have to change POSIX behaviour to allow reporting smaller pieces of progress through pipe (potentially 0 tasks finished). This would need changes in bunch of layers (worker, merge, blobstore) instead the current simple use of progress(...) function. It is possible to implement that communication, but it is significant amount of work and testing. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1568 To: wlis, #hg-reviewers, quark, mharbison72 Cc: mharbison72, quark, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1568: lfs: using workers in lfs prefetch
wlis marked an inline comment as done. wlis added a comment. Updated the test (as my changes change the output) and retested. Now everything works fine. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1568 To: wlis, #hg-reviewers, quark, mharbison72 Cc: mharbison72, quark, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1568: lfs: using workers in lfs prefetch
wlis updated this revision to Diff 4374. wlis edited the test plan for this revision. REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D1568?vs=4025&id=4374 REVISION DETAIL https://phab.mercurial-scm.org/D1568 AFFECTED FILES hgext/lfs/blobstore.py tests/test-lfs-test-server.t CHANGE DETAILS diff --git a/tests/test-lfs-test-server.t b/tests/test-lfs-test-server.t --- a/tests/test-lfs-test-server.t +++ b/tests/test-lfs-test-server.t @@ -43,6 +43,7 @@ pushing to ../repo2 searching for changes lfs: uploading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes) + lfs: processed: 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b 1 changesets found uncompressed size of bundle content: * (changelog) (glob) @@ -60,6 +61,7 @@ resolving manifests getting a lfs: downloading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes) + lfs: processed: 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b 1 files updated, 0 files merged, 0 files removed, 0 files unresolved When the server has some blobs already @@ -73,7 +75,9 @@ searching for changes lfs: need to transfer 2 objects (39 bytes) lfs: uploading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes) + lfs: processed: 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 lfs: uploading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes) + lfs: processed: d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 1 changesets found uncompressed size of bundle content: adding changesets @@ -88,8 +92,10 @@ getting b getting c lfs: downloading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes) + lfs: processed: d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 getting d lfs: downloading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes) + lfs: processed: 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 3 files updated, 0 files merged, 0 files removed, 0 files unresolved Check error message when the remote missed a blob: diff --git a/hgext/lfs/blobstore.py b/hgext/lfs/blobstore.py --- a/hgext/lfs/blobstore.py +++ b/hgext/lfs/blobstore.py @@ -19,6 +19,7 @@ url as urlmod, util, vfs as vfsmod, +worker, ) from ..largefiles import lfutil @@ -205,7 +206,7 @@ return filteredobjects -def _basictransfer(self, obj, action, localstore, progress=None): +def _basictransfer(self, obj, action, localstore): """Download or upload a single object using basic transfer protocol obj: dict, an object description returned by batch API @@ -223,7 +224,7 @@ request = util.urlreq.request(href) if action == 'upload': # If uploading blobs, read data from local blobstore. -request.data = filewithprogress(localstore.vfs(oid), progress) +request.data = filewithprogress(localstore.vfs(oid), None) request.get_method = lambda: 'PUT' for k, v in headers: @@ -236,8 +237,6 @@ data = req.read(1048576) if not data: break -if action == 'download' and progress: -progress(len(data)) response += data except util.urlerr.httperror as ex: raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)') @@ -252,45 +251,51 @@ raise error.ProgrammingError('invalid Git-LFS action: %s' % action) response = self._batchrequest(pointers, action) -prunningsize = [0] objects = self._extractobjects(response, pointers, action) total = sum(x.get('size', 0) for x in objects) +sizes = {} +for obj in objects: +sizes[obj.get('oid')] = obj.get('size', 0) topic = {'upload': _('lfs uploading'), 'download': _('lfs downloading')}[action] if self.ui.verbose and len(objects) > 1: self.ui.write(_('lfs: need to transfer %d objects (%s)\n') % (len(objects), util.bytecount(total))) self.ui.progress(topic, 0, total=total) -def progress(size): -# advance progress bar by "size" bytes -prunningsize[0] += size -self.ui.progress(topic, prunningsize[0], total=total) -for obj in sorted(objects, key=lambda o: o.get('oid')): -objsize = obj.get('size', 0) +def transfer(chunk): +for obj in chunk: +objsize = obj.get('size', 0) +if self.ui.verbose: +if action == 'download': +msg = _('lfs: downloading %s (%s)\n') +elif action == 'upload': +msg = _('lfs: uploading %s (%s)\n') +
D1568: lfs: using workers in lfs prefetch
wlis added a comment. I must have messed up something when running tests previously- probably wrong revision. The tests actually catch the failure above: [wlis@dev9680.prn1 ~/hg-committed/tests] ./run-tests.py -l test-lfs-test-server.t --- /home/wlis/hg-committed/tests/test-lfs-test-server.t +++ /home/wlis/hg-committed/tests/test-lfs-test-server.t.err @@ -43,54 +43,98 @@ pushing to ../repo2 searching for changes lfs: uploading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes) - 1 changesets found - uncompressed size of bundle content: - * (changelog) (glob) - * (manifests) (glob) - * a (glob) - adding changesets - adding manifests - adding file changes - added 1 changesets with 1 changes to 1 files + lfs: failed: TypeError("object of type 'file' has no len()",) (remaining retry 5) + lfs: failed: TypeError("object of type 'file' has no len()",) (remaining retry 4) + lfs: failed: TypeError("object of type 'file' has no len()",) (remaining retry 3) + lfs: failed: TypeError("object of type 'file' has no len()",) (remaining retry 2) + lfs: failed: TypeError("object of type 'file' has no len()",) (remaining retry 1) + ** unknown exception encountered, please report by visiting + ** https://mercurial-scm.org/wiki/BugTracker + ** Python 2.7.5 (default, Aug 4 2017, 00:39:18) [GCC 4.8.5 20150623 (Red Hat 4.8.5-16)] + ** Mercurial Distributed SCM (version 4.4.1+203-4da86512789c) + ** Extensions loaded: lfs + Traceback (most recent call last): +File "/home/wlis/hg-committed/hg", line 41, in + dispatch.run() +File "/home/wlis/hg-committed/mercurial/dispatch.py", line 88, in run + status = (dispatch(req) or 0) & 255 +File "/home/wlis/hg-committed/mercurial/dispatch.py", line 177, in dispatch + ret = _runcatch(req) +File "/home/wlis/hg-committed/mercurial/dispatch.py", line 318, in _runcatch + return _callcatch(ui, _runcatchfunc) +File "/home/wlis/hg-committed/mercurial/dispatch.py", line 326, in _callcatch + return scmutil.callcatch(ui, func) +File "/home/wlis/hg-committed/mercurial/scmutil.py", line 154, in callcatch + return func() +File "/home/wlis/hg-committed/mercurial/dispatch.py", line 308, in _runcatchfunc + return _dispatch(req) +File "/home/wlis/hg-committed/mercurial/dispatch.py", line 912, in _dispatch + cmdpats, cmdoptions) +File "/home/wlis/hg-committed/mercurial/dispatch.py", line 667, in runcommand + ret = _runcommand(ui, options, cmd, d) +File "/home/wlis/hg-committed/mercurial/dispatch.py", line 920, in _runcommand + return cmdfunc() +File "/home/wlis/hg-committed/mercurial/dispatch.py", line 909, in + d = lambda: util.checksignature(func)(ui, *args, **strcmdopt) +File "/home/wlis/hg-committed/mercurial/util.py", line 1188, in check + return func(*args, **kwargs) +File "/home/wlis/hg-committed/mercurial/commands.py", line 4160, in push + opargs=opargs) +File "/home/wlis/hg-committed/mercurial/exchange.py", line 475, in push + _pushbundle2(pushop) +File "/home/wlis/hg-committed/mercurial/exchange.py", line 1023, in _pushbundle2 + ret = partgen(pushop, bundler) +File "/home/wlis/hg-committed/mercurial/exchange.py", line 797, in _pushb2ctx + pushop.repo.prepushoutgoinghooks(pushop) +File "/home/wlis/hg-committed/mercurial/util.py", line 3125, in __call__ + results.append(hook(*args)) +File "/home/wlis/hg-committed/hgext/lfs/wrapper.py", line 263, in prepush + return uploadblobsfromrevs(pushop.repo, pushop.outgoing.missing) +File "/home/wlis/hg-committed/hgext/lfs/wrapper.py", line 254, in uploadblobsfromrevs + uploadblobs(repo, pointers) +File "/home/wlis/hg-committed/hgext/lfs/wrapper.py", line 306, in uploadblobs + remoteblob.writebatch(pointers, repo.svfs.lfslocalblobstore) +File "/home/wlis/hg-committed/hgext/lfs/blobstore.py", line 133, in writebatch + self._batch(pointers, fromstore, 'upload') +File "/home/wlis/hg-committed/hgext/lfs/blobstore.py", line 294, in _batch + for _one, oid in oids: +File "/home/wlis/hg-committed/hgext/lfs/blobstore.py", line 278, in transfer + self._basictransfer(obj, action, localstore) +File "/home/wlis/hg-committed/hgext/lfs/blobstore.py", line 235, in _basictransfer + req = self.urlopener.open(request) +File "/usr/lib64/python2.7/urllib2.py", line 429, in open + req = meth(req) +File "/usr/lib64/python2.7/urllib2.py", line 1152, in do_request_ + 'Content-length', '%d' % len(data)) + TypeError: object of type 'f
D1568: lfs: using workers in lfs prefetch
wlis planned changes to this revision. wlis added a comment. @mharbison72 you are right, the upload doesn't work because I removed the fliewithprogress wrapper around the file that adds couple functions that I didn't realize. That includes __len__. Will fix very soon. INLINE COMMENTS > blobstore.py:193 > # If uploading blobs, read data from local blobstore. > -request.data = filewithprogress(localstore.vfs(oid), progress) > +request.data = localstore.vfs(oid) > request.get_method = lambda: 'PUT' this line is at fault REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1568 To: wlis, #hg-reviewers, quark, mharbison72 Cc: mharbison72, quark, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1568: lfs: using workers in lfs prefetch
wlis requested review of this revision. wlis added a comment. Tested with the server: [wlis@dev9680.prn1 ~/hg-committed/tests] ./run-tests.py -l test-lfs* # Ran 4 tests, 0 skipped, 0 failed. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1568 To: wlis, #hg-reviewers, quark Cc: mharbison72, quark, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1568: lfs: using workers in lfs prefetch
wlis created this revision. Herald added a subscriber: mercurial-devel. Herald added a reviewer: hg-reviewers. REVISION SUMMARY This significantly speeds up lfs prefetch. With fast network we are seeing ~50% improvement of overall prefetch times Because of worker's API in posix we do lose finegrained progress update and only see progress when a file finished downloading. TEST PLAN Run tests: ./run-tests.py -l test-lfs* ..s. Skipped test-lfs-test-server.t: missing feature: git-lfs test server Ran 3 tests, 1 skipped, 0 failed. = Run commands resulting in lfs prefetch e.g. hg sparse --enable-profile REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1568 AFFECTED FILES hgext/lfs/blobstore.py CHANGE DETAILS diff --git a/hgext/lfs/blobstore.py b/hgext/lfs/blobstore.py --- a/hgext/lfs/blobstore.py +++ b/hgext/lfs/blobstore.py @@ -18,6 +18,7 @@ url as urlmod, util, vfs as vfsmod, +worker, ) # 64 bytes for SHA256 @@ -171,7 +172,7 @@ return filteredobjects -def _basictransfer(self, obj, action, localstore, progress=None): +def _basictransfer(self, obj, action, localstore): """Download or upload a single object using basic transfer protocol obj: dict, an object description returned by batch API @@ -189,7 +190,7 @@ request = util.urlreq.request(href) if action == 'upload': # If uploading blobs, read data from local blobstore. -request.data = filewithprogress(localstore.vfs(oid), progress) +request.data = localstore.vfs(oid) request.get_method = lambda: 'PUT' for k, v in headers: @@ -202,8 +203,6 @@ data = req.read(1048576) if not data: break -if action == 'download' and progress: -progress(len(data)) response += data except util.urlerr.httperror as ex: raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)') @@ -218,45 +217,51 @@ raise error.ProgrammingError('invalid Git-LFS action: %s' % action) response = self._batchrequest(pointers, action) -prunningsize = [0] objects = self._extractobjects(response, pointers, action) total = sum(x.get('size', 0) for x in objects) +sizes = {} +for obj in objects: +sizes[obj.get('oid')] = obj.get('size', 0) topic = {'upload': _('lfs uploading'), 'download': _('lfs downloading')}[action] if self.ui.verbose and len(objects) > 1: self.ui.write(_('lfs: need to transfer %d objects (%s)\n') % (len(objects), util.bytecount(total))) self.ui.progress(topic, 0, total=total) -def progress(size): -# advance progress bar by "size" bytes -prunningsize[0] += size -self.ui.progress(topic, prunningsize[0], total=total) -for obj in sorted(objects, key=lambda o: o.get('oid')): -objsize = obj.get('size', 0) +def transfer(chunk): +for obj in chunk: +objsize = obj.get('size', 0) +if self.ui.verbose: +if action == 'download': +msg = _('lfs: downloading %s (%s)\n') +elif action == 'upload': +msg = _('lfs: uploading %s (%s)\n') +self.ui.write(msg % (obj.get('oid'), + util.bytecount(objsize))) +retry = self.retry +while True: +try: +self._basictransfer(obj, action, localstore) +yield 1, obj.get('oid') +break +except Exception as ex: +if retry > 0: +if self.ui.verbose: +self.ui.write( +_('lfs: failed: %r (remaining retry %d)\n') +% (ex, retry)) +retry -= 1 +continue +raise + +oids = worker.worker(self.ui, 0.1, transfer, (), + sorted(objects, key=lambda o: o.get('oid'))) +processed = 0 +for _one, oid in oids: +processed += sizes[oid] +self.ui.progress(topic, processed, total=total) if self.ui.verbose: -if action == 'download': -msg = _('lfs: downloading %s (%s)\n') -elif action == 'upload': -msg = _('lfs: uploading %s (%s)\n') -self.ui.write(msg % (obj.get('oid'), util.bytecount(objsize))) -origrunningsize = prunningsize[0] -retry = self.retry -while
D1460: workers: add config to enable/diable workers
wlis updated this revision to Diff 4023. wlis retitled this revision from "workers: add config to enable/disable workers" to "workers: add config to enable/diable workers". REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D1460?vs=3667&id=4023 REVISION DETAIL https://phab.mercurial-scm.org/D1460 AFFECTED FILES mercurial/configitems.py mercurial/help/config.txt mercurial/worker.py CHANGE DETAILS diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -82,7 +82,8 @@ args - arguments to split into chunks, to pass to individual workers ''' -if worthwhile(ui, costperarg, len(args)): +enabled = ui.configbool('worker', 'enabled', True) +if enabled and worthwhile(ui, costperarg, len(args)): return _platformworker(ui, func, staticargs, args) return func(*staticargs + (args,)) diff --git a/mercurial/help/config.txt b/mercurial/help/config.txt --- a/mercurial/help/config.txt +++ b/mercurial/help/config.txt @@ -2551,6 +2551,10 @@ directory updates in parallel on Unix-like systems, which greatly helps performance. +``enabled`` +Whether to enable workers code to be used. +(default: true) + ``numcpus`` Number of CPUs to use for parallel operations. A zero or negative value is treated as ``use the default``. diff --git a/mercurial/configitems.py b/mercurial/configitems.py --- a/mercurial/configitems.py +++ b/mercurial/configitems.py @@ -1148,6 +1148,9 @@ coreconfigitem('worker', 'backgroundclosethreadcount', default=4, ) +coreconfigitem('worker', 'enabled', +default=True, +) coreconfigitem('worker', 'numcpus', default=None, ) To: wlis, #hg-reviewers, lothiraldan Cc: lothiraldan, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1564: worker: make windows workers daemons
wlis updated this revision to Diff 4024. REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D1564?vs=4015&id=4024 REVISION DETAIL https://phab.mercurial-scm.org/D1564 AFFECTED FILES mercurial/worker.py CHANGE DETAILS diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -12,6 +12,7 @@ import signal import sys import threading +import time from .i18n import _ from . import ( @@ -216,6 +217,7 @@ self._func = func self._staticargs = staticargs self._interrupted = False +self.daemon = True self.exception = None def interrupt(self): @@ -242,16 +244,22 @@ raise threads = [] -def killworkers(): +def trykillworkers(): +# Allow up to 1 second to clean worker threads nicely +cleanupend = time.time() + 1 for t in threads: t.interrupt() for t in threads: -# try to let the threads handle interruption, but don't wait -# indefintely. the thread could be in infinite loop, handling -# a very long task or in a deadlock situation -t.join(5) +remainingtime = cleanupend - time.time() +t.join(remainingtime) if t.is_alive(): -raise error.Abort(_('failed to join worker thread')) +# pass over the workers joining failure. it is more +# important to surface the inital exception than the +# fact that one of workers may be processing a large +# task and does not get to handle the interruption. +ui.warn(_("failed to kill worker threads while " + "handling an exception\n")) +return workers = _numworkers(ui) resultqueue = util.queue() @@ -264,25 +272,19 @@ t = Worker(taskqueue, resultqueue, func, staticargs) threads.append(t) t.start() - -while len(threads) > 0: -while not resultqueue.empty(): -yield resultqueue.get() -threads[0].join(0.05) -finishedthreads = [_t for _t in threads if not _t.is_alive()] -for t in finishedthreads: -if t.exception is not None: -try: -killworkers() -except Exception: -# pass over the workers joining failure. it is more -# important to surface the inital exception than the -# fact that one of workers may be processing a large -# task and does not get to handle the interruption. -ui.warn(_("failed to kill worker threads while handling " - "an exception")) -raise t.exception -threads.remove(t) +try: +while len(threads) > 0: +while not resultqueue.empty(): +yield resultqueue.get() +threads[0].join(0.05) +finishedthreads = [_t for _t in threads if not _t.is_alive()] +for t in finishedthreads: +if t.exception is not None: +raise t.exception +threads.remove(t) +except Exception: # re-raises +trykillworkers() +raise while not resultqueue.empty(): yield resultqueue.get() To: wlis, #hg-reviewers, ikostia Cc: durham, ikostia, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1457: workers: don't use backgroundfilecloser in threads
wlis updated this revision to Diff 4022. wlis edited the summary of this revision. wlis retitled this revision from "workers: create backgroundcloser per thread" to "workers: don't use backgroundfilecloser in threads". REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D1457?vs=3664&id=4022 REVISION DETAIL https://phab.mercurial-scm.org/D1457 AFFECTED FILES mercurial/vfs.py CHANGE DETAILS diff --git a/mercurial/vfs.py b/mercurial/vfs.py --- a/mercurial/vfs.py +++ b/mercurial/vfs.py @@ -277,8 +277,12 @@ to ``__call__``/``open`` to result in the file possibly being closed asynchronously, on a background thread. """ -# This is an arbitrary restriction and could be changed if we ever -# have a use case. +# Sharing backgroundfilecloser between threads is complex and using +# multiple instances puts us at risk of running out of file descriptors +# only allow to use backgroundfilecloser when in main thread. +if not isinstance(threading.currentThread(), threading._MainThread): +yield +return vfs = getattr(self, 'vfs', self) if getattr(vfs, '_backgroundfilecloser', None): raise error.Abort( @@ -413,7 +417,8 @@ ' valid for checkambig=True') % mode) fp = checkambigatclosing(fp) -if backgroundclose: +if backgroundclose and \ +isinstance(threading.currentThread(), threading._MainThread): if not self._backgroundfilecloser: raise error.Abort(_('backgroundclose can only be used when a ' 'backgroundclosing context manager is active') To: wlis, #hg-reviewers, indygreg Cc: indygreg, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1564: worker: make windows workers daemons
wlis added inline comments. INLINE COMMENTS > durham wrote in worker.py:286 > Why only do it on keyboard interrupt? What if there's another exception? If > you did it for all exceptions, you could drop the trykillworkers() inside the > loop, and just throw the exception up to here. this is a very good point. will do. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1564 To: wlis, #hg-reviewers, ikostia Cc: durham, ikostia, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1564: worker: make windows workers daemons
wlis created this revision. Herald added a subscriber: mercurial-devel. Herald added a reviewer: hg-reviewers. REVISION SUMMARY The windows workers weren't daemons and were not correctly killed when ctrl-c'd from the terminal. Withi this change when the main thread is killed, all daemons get killed as well. I also reduced the time we give to workers to cleanup nicely to not have people ctrl-c'ing when they get inpatient. The output when threads clened up nicely: PS C:\> hg.exe sparse --disable-profile SparseProfiles/.sparse interrupted! The output when threads don't clenup in 1 sec: PS C:\ hg.exe sparse --enable-profile SparseProfiles/.sparse failed to kill worker threads while handling an exception interrupted! Exception in thread Thread-4 (most likely raised during interpreter shutdown): PS C:\> TEST PLAN Run hg command on windows (pull/update/sparse). Ctrl-C'd sparse --enable-profile command that was using threads and observed in proces explorer that all threads got killed. ran tests on CentOS REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1564 AFFECTED FILES mercurial/worker.py CHANGE DETAILS diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -12,6 +12,7 @@ import signal import sys import threading +import time from .i18n import _ from . import ( @@ -216,6 +217,7 @@ self._func = func self._staticargs = staticargs self._interrupted = False +self.daemon = True self.exception = None def interrupt(self): @@ -242,16 +244,22 @@ raise threads = [] -def killworkers(): +def trykillworkers(): +# Allow up to 1 second to clean worker threads nicely +cleanupend = time.time() + 1 for t in threads: t.interrupt() for t in threads: -# try to let the threads handle interruption, but don't wait -# indefintely. the thread could be in infinite loop, handling -# a very long task or in a deadlock situation -t.join(5) +remainingtime = cleanupend - time.time() +t.join(remainingtime) if t.is_alive(): -raise error.Abort(_('failed to join worker thread')) +# pass over the workers joining failure. it is more +# important to surface the inital exception than the +# fact that one of workers may be processing a large +# task and does not get to handle the interruption. +ui.warn(_("failed to kill worker threads while " + "handling an exception\n")) +return workers = _numworkers(ui) resultqueue = util.queue() @@ -264,25 +272,20 @@ t = Worker(taskqueue, resultqueue, func, staticargs) threads.append(t) t.start() - -while len(threads) > 0: -while not resultqueue.empty(): -yield resultqueue.get() -threads[0].join(0.05) -finishedthreads = [_t for _t in threads if not _t.is_alive()] -for t in finishedthreads: -if t.exception is not None: -try: -killworkers() -except Exception: -# pass over the workers joining failure. it is more -# important to surface the inital exception than the -# fact that one of workers may be processing a large -# task and does not get to handle the interruption. -ui.warn(_("failed to kill worker threads while handling " - "an exception")) -raise t.exception -threads.remove(t) +try: +while len(threads) > 0: +while not resultqueue.empty(): +yield resultqueue.get() +threads[0].join(0.05) +finishedthreads = [_t for _t in threads if not _t.is_alive()] +for t in finishedthreads: +if t.exception is not None: +trykillworkers() +raise t.exception +threads.remove(t) +except KeyboardInterrupt: +trykillworkers() +raise while not resultqueue.empty(): yield resultqueue.get() To: wlis, #hg-reviewers Cc: mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1458: workers: implemented worker on windows
wlis added a comment. The previous issues were related to fb-hgext remotefilelog and https://phab.mercurial-scm.org/D1513 fixes it on the side of remotefilelog. I will still need to test this code on a repo without remotefilelog to make sure that the normal filelog doesn't hit similar issues. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1458 To: wlis, #hg-reviewers Cc: indygreg, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1458: workers: implemented worker on windows
wlis planned changes to this revision. wlis added a comment. I need to test these changes a bit more. I found a place in merge.py that has a risk of race condition and need to figure out how to protect it. Right now there are 2 places where we use workers. 1 in core (merge.py) and there is also us in lfs in fb extensions. We actually hit the CPU limit right away when we start using threading, and I initially looked into implementing something closer to what we do in posix, but as you say that is not easy on Windows as fork doesn't exist. I think that from my perspective putting the applyupdates code into rust and starting multiple processes with that would work, and as you say gives more space for perf improvement than this change. I am happy with making this code short lived if it gets replaced by a better solution. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1458 To: wlis, #hg-reviewers Cc: indygreg, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1457: workers: create backgroundcloser per thread
wlis added a comment. That sounds good. I actually started with a change to manage a single background closer between threads, but the locking code gets a bit complicated and seemed more risky. I didn't know the main reason for 1 background closer was the number of descriptors. I'll check what disabling backgroundcloser does to the performance. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1457 To: wlis, #hg-reviewers, indygreg Cc: indygreg, mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1460: workers: add config to enable/diable workers
wlis created this revision. Herald added a subscriber: mercurial-devel. Herald added a reviewer: hg-reviewers. REVISION SUMMARY This adds config to disable/enable workers with default being enabled. TEST PLAN enabled profile without updaing .hg/hgrc (the default should be to use workers) and ran hg sprase --enable-profile .sparse Watched in the proces explorer that hg started 12 new threads for materializing files (this is my worker.numcpus) value Added [worker] enabled = False to the .hg/hgrc and re ran the command. This time hg didn't spawn any new threads for matreializing of files REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1460 AFFECTED FILES mercurial/help/config.txt mercurial/worker.py CHANGE DETAILS diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -82,7 +82,8 @@ args - arguments to split into chunks, to pass to individual workers ''' -if worthwhile(ui, costperarg, len(args)): +enabled = ui.configbool('worker', 'enabled', True) +if enabled and worthwhile(ui, costperarg, len(args)): return _platformworker(ui, func, staticargs, args) return func(*staticargs + (args,)) diff --git a/mercurial/help/config.txt b/mercurial/help/config.txt --- a/mercurial/help/config.txt +++ b/mercurial/help/config.txt @@ -2547,6 +2547,10 @@ directory updates in parallel on Unix-like systems, which greatly helps performance. +``enabled`` +Whether to enable workers code to be used. +(default: true) + ``numcpus`` Number of CPUs to use for parallel operations. A zero or negative value is treated as ``use the default``. To: wlis, #hg-reviewers Cc: mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1459: workers: handling exceptions in windows workers
wlis created this revision. Herald added a subscriber: mercurial-devel. Herald added a reviewer: hg-reviewers. REVISION SUMMARY This adds handling of exceptions from worker threads and resurfaces them as if the function ran without workers. If any of the threads throws, the main thread kills all running threads giving them 5 sec to handle the interruption and raises the first exception received. We don't have to join threads if is_alive() is false TEST PLAN Ran multiple updates/enable/disable sparse profile and things worked well Ran test on CentOS- all tests passing on @ passed here Added a forged exception into the worker code and got it properly resurfaced and the rest of workers killed: P58642088 PS C:\open\> ..\facebook-hg-rpms\build\hg\hg.exe --config extensions.fsmonitor=! sparse --enable-profile updating [==> ] 1300/39166 1m57sException in thread Thread-3: Traceback (most recent call last): File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner self.run() File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run raise e Exception: Forged exception Exception in thread Thread-2: Traceback (most recent call last): File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner self.run() File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run raise e Exception: Forged exception <...> Traceback (most recent call last): File "C:\open\facebook-hg-rpms\build\hg\hgexe.py", line 41, in dispatch.run() File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 85, in run status = (dispatch(req) or 0) & 255 File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 173, in dispatch ret = _runcatch(req) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 324, in _runcatch return _callcatch(ui, _runcatchfunc) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 332, in _callcatch return scmutil.callcatch(ui, func) File "C:\open\facebook-hg-rpms\build\hg\mercurial\scmutil.py", line 154, in callcatch return func() File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 314, in _runcatchfunc return _dispatch(req) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 951, in _dispatch cmdpats, cmdoptions) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 415, in runcommand return orig(lui, repo, *args, **kwargs) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\undo.py", line 118, in _runcommandwrapper result = orig(lui, repo, cmd, fullargs, *args) File "C:\open\facebook-hg-rpms\build\hg\hgext\journal.py", line 84, in runcommand return orig(lui, repo, cmd, fullargs, *args) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 268, in _tracksparseprofiles res = runcommand(lui, repo, *args) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 256, in _trackdirstatesizes res = runcommand(lui, repo, *args) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\copytrace.py", line 144, in _runcommand return orig(lui, repo, cmd, fullargs, ui, *args, **kwargs) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbamend\hiddenoverride.py", line 119, in runcommand result = orig(lui, repo, cmd, fullargs, *args) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 712, in runcommand ret = _runcommand(ui, options, cmd, d) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 959, in _runcommand return cmdfunc() File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 948, in d = lambda: util.checksignature(func)(ui, *args, **strcmdopt) File "C:\open\facebook-hg-rpms\build\hg\mercurial\util.py", line 1183, in check return func(*args, **kwargs) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 860, in sparse disableprofile=disableprofile, force=force) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 949, in _config len, _refresh(ui, repo, oldstatus, oldsparsematch, force)) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 1116, in _refresh mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-p
D1458: workers: implemented worker on windows
wlis created this revision. Herald added a subscriber: mercurial-devel. Herald added a reviewer: hg-reviewers. REVISION SUMMARY This change implements thread based worker on windows. The handling of exception from within threads will happen in separate diff. The worker is for now used in mercurial/merge.py and in lfs extension After multiple tests and milions of files materiealized, thousands lfs fetched it seems that neither merge.py nor lfs/blobstore.py are not thread safe. I also looked through the code and besides the backgroundfilecloser (handled in base of this) things look good. The performance boost of this on windows is ~50% for sparse --enable-profile - Speedup of hg up/rebase - not exactly measured TEST PLAN Ran 10s of hg sparse --enable-profile and --disable-profile operations on large profiles and verified that workers are running. Used sysinternals suite to see that all threads are spawned and run as they should Run various other operations on the repo including update and rebase Ran tests on CentOS and all tests that pass on @ pass here REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1458 AFFECTED FILES mercurial/worker.py CHANGE DETAILS diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -11,6 +11,7 @@ import os import signal import sys +import threading from .i18n import _ from . import ( @@ -53,7 +54,7 @@ raise error.Abort(_('number of cpus must be an integer')) return min(max(countcpus(), 4), 32) -if pycompat.isposix: +if pycompat.isposix or pycompat.iswindows: _startupcost = 0.01 else: _startupcost = 1e30 @@ -203,7 +204,51 @@ elif os.WIFSIGNALED(code): return -os.WTERMSIG(code) -if not pycompat.iswindows: +def _windowsworker(ui, func, staticargs, args): +class Worker(threading.Thread): +def __init__(self, taskqueue, resultqueue, func, staticargs, + group=None, target=None, name=None, verbose=None): +threading.Thread.__init__(self, group=group, target=target, + name=name, verbose=verbose) +self._taskqueue = taskqueue +self._resultqueue = resultqueue +self._func = func +self._staticargs = staticargs + +def run(self): +while not self._taskqueue.empty(): +try: +args = self._taskqueue.get_nowait() +for res in self._func(*self._staticargs + (args,)): +self._resultqueue.put(res) +except util.empty: +break + +workers = _numworkers(ui) +threads = [] +resultqueue = util.queue() +taskqueue = util.queue() +# partition work to more pieces than workers to minimize the chance +# of uneven distribution of large tasks between the workers +for pargs in partition(args, workers * 20): +taskqueue.put(pargs) +for _i in range(workers): +t = Worker(taskqueue, resultqueue, func, staticargs) +threads.append(t) +t.start() +while any(t.is_alive() for t in threads): +while not resultqueue.empty(): +yield resultqueue.get() +t = threads[0] +t.join(0.05) +if not t.is_alive(): +threads.remove(t) +while not resultqueue.empty(): +yield resultqueue.get() + +if pycompat.iswindows: +_platformworker = _windowsworker +else: _platformworker = _posixworker _exitstatus = _posixexitstatus To: wlis, #hg-reviewers Cc: mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
D1457: workers: create backgroundcloser per thread
wlis created this revision. Herald added a subscriber: mercurial-devel. Herald added a reviewer: hg-reviewers. REVISION SUMMARY This allows to create _backgroundfilecloser per thread. The threading.local() manages the distinct storage between threads and works for main thread as well (if no actuall threading is used) TEST PLAN Ran pull, update, sparse commands and watched the closer threads created and destroyed in procexp.exe ran test on CentOS. No tests broken compared to the base REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1457 AFFECTED FILES mercurial/vfs.py CHANGE DETAILS diff --git a/mercurial/vfs.py b/mercurial/vfs.py --- a/mercurial/vfs.py +++ b/mercurial/vfs.py @@ -269,27 +269,27 @@ for dirpath, dirs, files in os.walk(self.join(path), onerror=onerror): yield (dirpath[prefixlen:], dirs, files) +threaddata = threading.local() + @contextlib.contextmanager def backgroundclosing(self, ui, expectedcount=-1): """Allow files to be closed asynchronously. When this context manager is active, ``backgroundclose`` can be passed to ``__call__``/``open`` to result in the file possibly being closed asynchronously, on a background thread. """ -# This is an arbitrary restriction and could be changed if we ever -# have a use case. vfs = getattr(self, 'vfs', self) -if getattr(vfs, '_backgroundfilecloser', None): +if getattr(vfs.threaddata, '_backgroundfilecloser', None): raise error.Abort( -_('can only have 1 active background file closer')) +_('can only have 1 active background file closer per thread')) with backgroundfilecloser(ui, expectedcount=expectedcount) as bfc: try: -vfs._backgroundfilecloser = bfc +vfs.threaddata._backgroundfilecloser = bfc yield bfc finally: -vfs._backgroundfilecloser = None +vfs.threaddata._backgroundfilecloser = None class vfs(abstractvfs): '''Operate files relative to a base directory @@ -414,12 +414,12 @@ fp = checkambigatclosing(fp) if backgroundclose: -if not self._backgroundfilecloser: +if not self.threaddata._backgroundfilecloser: raise error.Abort(_('backgroundclose can only be used when a ' 'backgroundclosing context manager is active') ) -fp = delayclosedfile(fp, self._backgroundfilecloser) +fp = delayclosedfile(fp, self.threaddata._backgroundfilecloser) return fp To: wlis, #hg-reviewers Cc: mercurial-devel ___ Mercurial-devel mailing list Mercurial-devel@mercurial-scm.org https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel