joerg.sonnenberger created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  Introduce a new option server.streamunbundle which starts a transaction
  immediately to apply a bundle instead of writing it to a temporary file
  first. This side steps the need for a large tmp directory at the cost of
  preventing concurrent pushes. This is a reasonable trade-off for many
  setups as concurrent pushes for the main branch at least are disallowed
  anyway. The option defaults to off to preserve existing behavior.
  
  Change the wireproto interface to provide a generator for reading the
  payload and make callers responsible for consuming all data.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D2470

AFFECTED FILES
  hgext/largefiles/proto.py
  mercurial/configitems.py
  mercurial/help/config.txt
  mercurial/wireproto.py
  mercurial/wireprotoserver.py
  mercurial/wireprototypes.py

CHANGE DETAILS

diff --git a/mercurial/wireprototypes.py b/mercurial/wireprototypes.py
--- a/mercurial/wireprototypes.py
+++ b/mercurial/wireprototypes.py
@@ -92,10 +92,11 @@
         returns a list of values (same order as <args>)"""
 
     @abc.abstractmethod
-    def forwardpayload(self, fp):
-        """Read the raw payload and forward to a file.
+    def getpayload(self):
+        """Provide a generator for the raw payload.
 
-        The payload is read in full before the function returns.
+        The caller is responsible for ensuring that the full payload is
+        processed.
         """
 
     @abc.abstractmethod
diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py
--- a/mercurial/wireprotoserver.py
+++ b/mercurial/wireprotoserver.py
@@ -91,16 +91,15 @@
         args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
         return args
 
-    def forwardpayload(self, fp):
+    def getpayload(self):
         if r'HTTP_CONTENT_LENGTH' in self._req.env:
             length = int(self._req.env[r'HTTP_CONTENT_LENGTH'])
         else:
             length = int(self._req.env[r'CONTENT_LENGTH'])
         # If httppostargs is used, we need to read Content-Length
         # minus the amount that was consumed by args.
         length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
-        for s in util.filechunkiter(self._req, limit=length):
-            fp.write(s)
+        return util.filechunkiter(self._req, limit=length)
 
     @contextlib.contextmanager
     def mayberedirectstdio(self):
@@ -346,7 +345,7 @@
                 data[arg] = val
         return [data[k] for k in keys]
 
-    def forwardpayload(self, fpout):
+    def getpayload(self):
         # We initially send an empty response. This tells the client it is
         # OK to start sending data. If a client sees any other response, it
         # interprets it as an error.
@@ -359,7 +358,7 @@
         # 0\n
         count = int(self._fin.readline())
         while count:
-            fpout.write(self._fin.read(count))
+            yield self._fin.read(count)
             count = int(self._fin.readline())
 
     @contextlib.contextmanager
diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py
--- a/mercurial/wireproto.py
+++ b/mercurial/wireproto.py
@@ -972,14 +972,29 @@
     with proto.mayberedirectstdio() as output:
         try:
             exchange.check_heads(repo, their_heads, 'preparing changes')
+            cleanup = lambda: None
+            try:
+                payload = proto.getpayload()
+                if repo.ui.configbool('server', 'streamunbundle'):
+                    def cleanup():
+                        for p in payload:
+                            pass
+                    fp = util.chunkbuffer(payload)
+                else:
+                    # write bundle data to temporary file because it can be big
+                    fp, tempname = None, None
+                    def cleanup():
+                        if fp:
+                            fp.close()
+                        if tempname:
+                            os.unlink(tempname)
+                    fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
+                    fp = os.fdopen(fd, pycompat.sysstr('wb+'))
+                    r = 0
+                    for p in payload:
+                        fp.write(p)
+                    fp.seek(0)
 
-            # write bundle data to temporary file because it can be big
-            fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
-            fp = os.fdopen(fd, pycompat.sysstr('wb+'))
-            r = 0
-            try:
-                proto.forwardpayload(fp)
-                fp.seek(0)
                 gen = exchange.readbundle(repo.ui, fp, None)
                 if (isinstance(gen, changegroupmod.cg1unpacker)
                     and not bundle1allowed(repo, 'push')):
@@ -1001,8 +1016,7 @@
                 return pushres(r, output.getvalue() if output else '')
 
             finally:
-                fp.close()
-                os.unlink(tempname)
+                cleanup()
 
         except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
             # handle non-bundle2 case first
diff --git a/mercurial/help/config.txt b/mercurial/help/config.txt
--- a/mercurial/help/config.txt
+++ b/mercurial/help/config.txt
@@ -1792,6 +1792,11 @@
     are highly recommended. Partial clones will still be allowed.
     (default: False)
 
+``streamunbundle``
+    When set, servers will apply data sent from the client directly,
+    otherwise it will be written to a temporary file first. This option
+    effectively prevents concurrent pushes.
+
 ``concurrent-push-mode``
     Level of allowed race condition between two pushing clients.
 
diff --git a/mercurial/configitems.py b/mercurial/configitems.py
--- a/mercurial/configitems.py
+++ b/mercurial/configitems.py
@@ -902,6 +902,9 @@
 coreconfigitem('server', 'disablefullbundle',
     default=False,
 )
+coreconfigitem('server', 'streamunbundle',
+    default=False,
+)
 coreconfigitem('server', 'maxhttpheaderlen',
     default=1024,
 )
diff --git a/hgext/largefiles/proto.py b/hgext/largefiles/proto.py
--- a/hgext/largefiles/proto.py
+++ b/hgext/largefiles/proto.py
@@ -41,7 +41,8 @@
         tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
 
         try:
-            proto.forwardpayload(tmpfp)
+            for p in proto.getpayload():
+                tmpfp.write(p)
             tmpfp._fp.seek(0)
             if sha != lfutil.hexsha1(tmpfp._fp):
                 raise IOError(0, _('largefile contents do not match hash'))



To: joerg.sonnenberger, #hg-reviewers
Cc: mercurial-devel
_______________________________________________
Mercurial-devel mailing list
Mercurial-devel@mercurial-scm.org
https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel

Reply via email to