On Mon, 9 May 2011 02:46:09 +0300
Onur Küçük <o...@pardus.org.tr> wrote:

> 
> On Thu, 28 Apr 2011 16:48:54 +0000
> Gökçen Eraslan <gok...@pardus.org.tr> wrote:
> 
> > Ozan'la birlikte geçenlerde yaptığımız girişimler sonucunda ekteki
> > yama çıktı ortaya. Özetle normalde index.py'deki Index sınıfının
> > index methodunda for döngüsü içinde her paket için tek tek yapılan
> > pspec.xml/metadata.xml parse etme, sha1sum hesaplama işlemini
> > Python'un harika multiprocessing modülünü kullanarak bir süreç
> > havuzunda yaptırttık ve 2011 devel farmında indeksleme işlemi 1dk
> > 56sn'den, 49sn'ye indi. Bu arada bugün bir değişiklik daha yapıp,
> > kaynak paketlerin indekslenmesi için de aynı paralelleşmeyi
> > sağladım. Kaynak depolar için de test edebilirsiniz.
> 
>  Pek leziz olmuş, elinize sağlık. multiprocessing yüzünden nesne
> temelinden uzaklaştık ama bu değişiklik için bence değer, daha sonra
> sakin bir kafayla belki bu işi biraz daha farklı yaparız.

 Ekte de 2009 'a uyarlanmış hali var. Sırf sha1sum hesaplanmasının
paralelleşmesi bile bayağı bir performans kazandırıyor ( % 50-80 ),
güzel oldu bu :)

-- 
 Onur Küçük                                      Knowledge speaks,
 <onur.--.-.pardus.org.tr>                       but wisdom listens

diff -Nur pisi-2.3.2-old/pisi/index.py pisi-2.3.2/pisi/index.py
--- pisi-2.3.2-old/pisi/index.py	2011-05-09 02:47:29.146619068 +0300
+++ pisi-2.3.2/pisi/index.py	2011-05-09 02:59:29.216629354 +0300
@@ -14,6 +14,7 @@
 
 import os
 import shutil
+import multiprocessing
 
 import gettext
 __trans = gettext.translation('pisi', fallback=True)
@@ -30,6 +31,8 @@
 import pisi.pxml.autoxml as autoxml
 import pisi.component as component
 import pisi.group as group
+import pisi.operations.build
+
 
 class Error(pisi.Error):
     pass
@@ -85,7 +88,9 @@
         self.repo_dir = repo_uri
 
         packages = []
+        specs = []
         deltas = {}
+
         for root, dirs, files in os.walk(repo_uri):
             for fn in files:
 
@@ -96,26 +101,61 @@
                     packages.append(os.path.join(root, fn))
 
                 if fn == 'components.xml':
-                    self.add_components(os.path.join(root, fn))
+                    self.components.extend(add_components(os.path.join(root, fn)))
                 if fn == 'pspec.xml' and not skip_sources:
-                    self.add_spec(os.path.join(root, fn), repo_uri)
+                    specs.append((os.path.join(root, fn), repo_uri))
                 if fn == 'distribution.xml':
-                    self.add_distro(os.path.join(root, fn))
+                    self.distribution = add_distro(os.path.join(root, fn))
                 if fn == 'groups.xml':
-                    self.add_groups(os.path.join(root, fn))
+                    self.groups.extend(add_groups(os.path.join(root, fn)))
+
+        # Create a process pool, as many processes as the number of CPUs we have
+        pool = multiprocessing.Pool()
+
+        try:
+            # Add source packages to index using a process pool
+            self.specs = pool.map(add_spec, specs)
+        except:
+            # If an exception occurs (like a keyboard interrupt), immediately terminate worker 
+            # processes and propagate exception. (CLI honors KeyboardInterrupt exception, if you're 
+            # not using CLI, you must handle KeyboardException yourself)
+            pool.terminate()
+            raise
 
         try:
             obsoletes_list = map(str, self.distribution.obsoletes)
         except AttributeError:
             obsoletes_list = []
 
+        latest_packages = []
+
         for pkg in util.filter_latest_packages(packages):
             pkg_name = util.parse_package_name(os.path.basename(pkg))[0]
             if pkg_name not in obsoletes_list:
-                ctx.ui.info(_('Adding %s to package index') % pkg)
-                self.add_package(pkg, deltas, repo_uri)
+                # Currently, multiprocessing.Pool.map method accepts methods
+                # with single parameters only. So we have to send our parameters
+                # as a tuple to workaround that
+
+                latest_packages.append((pkg, deltas, repo_uri))
+
+        print "Starting binary..."
+        try:
+            # Add binary packages to index using a process pool
+            self.packages = pool.map(add_package, latest_packages)
+        except:
+            pool.terminate()
+            raise
+        else:
+            # Clean up output
+            ctx.ui.info("\r%-80.80s" % (_('Done.')))
+            print "Done"
+
+def add_package(params):
+    try:
+        path, deltas, repo_uri = params
+
+        ctx.ui.info("\r%-80.80s" % (_('Adding package to index: %s') % os.path.basename(path)), noln = True)
 
-    def add_package(self, path, deltas, repo_uri):
         package = pisi.package.Package(path, 'r')
         md = package.get_metadata()
         md.package.packageSize = long(os.path.getsize(path))
@@ -149,43 +189,54 @@
                     if int(buildTo) == int(md.package.build):
                         md.package.deltaPackages.append(delta)
 
-            self.packages.append(md.package)
-
-    def add_groups(self, path):
-        ctx.ui.info("Adding groups.xml to index...")
-        groups_xml = group.Groups()
-        groups_xml.read(path)
-        for grp in groups_xml.groups:
-            self.groups.append(grp)
-
-    def add_components(self, path):
-        ctx.ui.info("Adding components.xml to index...")
-        components_xml = component.Components()
-        components_xml.read(path)
-        #try:
-        for comp in components_xml.components:
-            self.components.append(comp)
-        #except:
-        #    raise Error(_('Component in %s is corrupt') % path)
-            #ctx.ui.error(str(Error(*errs)))
-
-    def add_distro(self, path):
-        ctx.ui.info("Adding distribution.xml to index...")
-        distro = component.Distribution()
-        #try:
-        distro.read(path)
-        self.distribution = distro
-        #except:
-        #    raise Error(_('Distribution in %s is corrupt') % path)
-            #ctx.ui.error(str(Error(*errs)))
+        return md.package
 
-    def add_spec(self, path, repo_uri):
-        import pisi.operations.build
+    except KeyboardInterrupt:
+        # Handle KeyboardInterrupt exception to prevent ugly backtrace of all worker processes
+        # and propagate the exception to main process.
+        #
+        # Probably it's better to use just 'raise' here, but multiprocessing module has some bugs about that:
+        # (python#8296, python#9205 and python#9207 )
+        #
+        # For now, worker processes do not propagate exceptions other than Exception (like KeyboardInterrupt),
+        # so we have to manually propagate KeyboardInterrupt exception as an Exception.
+
+        raise Exception
+
+def add_groups(path):
+    ctx.ui.info("Adding groups.xml to index...")
+    groups_xml = group.Groups()
+    groups_xml.read(path)
+    return groups_xml.groups
+
+def add_components(path):
+    ctx.ui.info("Adding components.xml to index...")
+    components_xml = component.Components()
+    components_xml.read(path)
+    #try:
+    return components_xml.components
+    #except:
+    #    raise Error(_('Component in %s is corrupt') % path)
+    #ctx.ui.error(str(Error(*errs)))
+
+def add_distro(path):
+    ctx.ui.info("Adding distribution.xml to index...")
+    distro = component.Distribution()
+    #try:
+    distro.read(path)
+    return distro
+    #except:
+    #    raise Error(_('Distribution in %s is corrupt') % path)
+    #ctx.ui.error(str(Error(*errs)))
+
+def add_spec(params):
+    try:
+        path , repo_uri = params
         ctx.ui.info(_('Adding %s to source index') % path)
         #TODO: may use try/except to handle this
         builder = pisi.operations.build.Builder(path)
-            #ctx.ui.error(_('SpecFile in %s is corrupt, skipping...') % path)
-            #ctx.ui.error(str(Error(*errs)))
+        #ctx.ui.error(_('SpecFile in %s is corrupt, skipping...') % path)
+        #ctx.ui.error(str(Error(*errs)))
         builder.fetch_component()
         sf = builder.spec
         if ctx.config.options and ctx.config.options.absolute_urls:
@@ -193,4 +244,8 @@
         else:                           # create relative path by default
             sf.source.sourceURI = util.removepathprefix(repo_uri, path)
             # check component
-        self.specs.append(sf)
+        return sf
+
+    except KeyboardInterrupt:
+        # Multiprocessing hack, see add_package method for explanation
+        raise Exception
_______________________________________________
Gelistirici mailing list
Gelistirici@pardus.org.tr
http://liste.pardus.org.tr/mailman/listinfo/gelistirici

Reply via email to