Selamlar,

Uzun zamandır aklımda olan bir şey pisi'nin index kodunu biraz hızlandırmak. 
Farmı her dürttüğümde hem ikili deponun hem debug deposunun index'lenmesini 
beklemek çok uzun sürüyor.

Index hızlandırma işi ne zamandır aklımdan çıkmış olsa da, geçenlerde aklıma 
gelen "Biz neden hala farmı elle çalıştırıyoruz, her paket commit edildiğinde 
otomatik derlemeye başlasa ya!?!" düşüncesini sesli dile getirdiğimde, Onur 
bunun için index'in de hızlandırılması gerektiğini söylediğinde tekrar 
alevlendi. Zira derlenmesi üçer saniye süren üç paketi arka arkaya commit 
ettiğimizde indexlerle birlikte totalde dakikalarca indexleme yapılmasını 
beklemek akıl kârı değil.

Ozan'la birlikte geçenlerde yaptığımız girişimler sonucunda ekteki yama çıktı 
ortaya. Özetle normalde index.py'deki Index sınıfının index methodunda for 
döngüsü içinde her paket için tek tek yapılan pspec.xml/metadata.xml parse 
etme, sha1sum hesaplama işlemini Python'un harika multiprocessing modülünü 
kullanarak bir süreç havuzunda yaptırttık ve 2011 devel farmında indeksleme 
işlemi 1dk 56sn'den, 49sn'ye indi. Bu arada bugün bir değişiklik daha yapıp, 
kaynak paketlerin indekslenmesi için de aynı paralelleşmeyi sağladım. Kaynak 
depolar için de test edebilirsiniz.

Ortaya çıkan beğenmediğim birkaç husus var:

1- index.py nesne temelli tasarımdan biraz uzaklaşmış oldu. Bunun nedeni 
multiprocessing methodlarının sınıf methodları üzerinde çalışmıyor oluşu. 
Sınıf methodları değil muhakkak normal fonksiyonlar vermek gerekiyor 
multiprocessing modülüne paralelleştirme için.

2- Pool.map fonksiyonu sadece tek parametre alan fonksiyonlarda çalıştığı 
için, add_* fonksiyonlarının parametre sayısını bire indirmek zorunda kaldık.

3- Klavye kesmelerini yakalamak için küçük bir hack yapmak gerekiyor, detaylar 
yamanın içinde yazıyor yorum olarak.  Bununla ilgili python'da 3 tane açık 
hata var, yamada yazdım neler olduğunu. Bakıp yorumlayabilirsiniz.

4- Nadiren, Control+C'ye iki kez basmak gerekebiliyor indeks işlemini iptal 
etmek için. Üstteki durumdan kaynaklanıyor bu da.

Yamaya bir göz atın, deneyin, test edin, performansını ölçün. 

Ben bu üsttekilerin engelleyici sorunlar olduğunu düşünmüyorum yamayı içeri 
almak için ame yine de burada tartışalım.

Not: İndeks çıktısında daha önce indekslenen tüm paketler yeni satırla 
bölünerek destan gibi ekrana basılıyordu. Yamada bunu da değiştirdim, 
"\r%-80.80s" şeklinde tek satırda yazdırıyorum artık tüm paket isimlerini.

-- 
Gökçen Eraslan
Index: index.py
===================================================================
--- index.py	(revision 36769)
+++ index.py	(working copy)
@@ -14,6 +14,7 @@
 
 import os
 import shutil
+import multiprocessing
 
 import gettext
 __trans = gettext.translation('pisi', fallback=True)
@@ -30,7 +31,9 @@
 import pisi.pxml.autoxml as autoxml
 import pisi.component as component
 import pisi.group as group
+import pisi.operations.build
 
+
 class Error(pisi.Error):
     pass
 
@@ -85,7 +88,9 @@
         self.repo_dir = repo_uri
 
         packages = []
+        specs = []
         deltas = {}
+
         for root, dirs, files in os.walk(repo_uri):
             for fn in files:
 
@@ -96,29 +101,64 @@
                     packages.append(os.path.join(root, fn))
 
                 if fn == 'components.xml':
-                    self.add_components(os.path.join(root, fn))
+                    self.components.extend(add_components(os.path.join(root, fn)))
                 if fn == 'pspec.xml' and not skip_sources:
-                    self.add_spec(os.path.join(root, fn), repo_uri)
+                    specs.append((os.path.join(root, fn), repo_uri))
                 if fn == 'distribution.xml':
-                    self.add_distro(os.path.join(root, fn))
+                    self.distribution = add_distro(os.path.join(root, fn))
                 if fn == 'groups.xml':
-                    self.add_groups(os.path.join(root, fn))
+                    self.groups.extend(add_groups(os.path.join(root, fn)))
 
+        # Create a process pool, as many processes as the number of CPUs we have
+        pool = multiprocessing.Pool()
+
         try:
+            # Add source packages to index using a process pool
+            self.specs = pool.map(add_spec, specs)
+        except:
+            # If an exception occurs (like a keyboard interrupt), immediately terminate worker 
+            # processes and propagate exception. (CLI honors KeyboardInterrupt exception, if you're 
+            # not using CLI, you must handle KeyboardException yourself)
+            pool.terminate()
+            raise
+
+        try:
             obsoletes_list = map(str, self.distribution.obsoletes)
         except AttributeError:
             obsoletes_list = []
 
+        latest_packages = []
+
         for pkg in util.filter_latest_packages(packages):
             pkg_name = util.parse_package_name(os.path.basename(pkg))[0]
             if pkg_name.endswith(ctx.const.debug_name_suffix):
                 pkg_name = util.remove_suffix(ctx.const.debug_name_suffix,
                                               pkg_name)
             if pkg_name not in obsoletes_list:
-                ctx.ui.info(_('Adding %s to package index') % pkg)
-                self.add_package(pkg, deltas, repo_uri)
+                # Currently, multiprocessing.Pool.map method accepts methods
+                # with single parameters only. So we have to send our parameters
+                # as a tuple to workaround that
 
-    def add_package(self, path, deltas, repo_uri):
+                latest_packages.append((pkg, deltas, repo_uri))
+
+        print "Starting binary..."
+        try:
+            # Add binary packages to index using a process pool
+            self.packages = pool.map(add_package, latest_packages)
+        except:
+            pool.terminate()
+            raise
+        else:
+            # Clean up output
+            ctx.ui.info("\r%-80.80s" % (_('Done.')))
+            print "Done"
+
+def add_package(params):
+    try:
+        path, deltas, repo_uri = params
+
+        ctx.ui.info("\r%-80.80s" % (_('Adding package to index: %s') % os.path.basename(path)), noln = True)
+
         package = pisi.package.Package(path, 'r')
         md = package.get_metadata()
         md.package.packageSize = long(os.path.getsize(path))
@@ -160,43 +200,54 @@
 
                     md.package.deltaPackages.append(delta)
 
-            self.packages.append(md.package)
+        return md.package
 
-    def add_groups(self, path):
-        ctx.ui.info("Adding groups.xml to index...")
-        groups_xml = group.Groups()
-        groups_xml.read(path)
-        for grp in groups_xml.groups:
-            self.groups.append(grp)
+    except KeyboardInterrupt:
+        # Handle KeyboardInterrupt exception to prevent ugly backtrace of all worker processes
+        # and propagate the exception to main process.
+        #
+        # Probably it's better to use just 'raise' here, but multiprocessing module has some bugs about that:
+        # (python#8296, python#9205 and python#9207 )
+        #
+        # For now, worker processes do not propagate exceptions other than Exception (like KeyboardInterrupt),
+        # so we have to manually propagate KeyboardInterrupt exception as an Exception.
 
-    def add_components(self, path):
-        ctx.ui.info("Adding components.xml to index...")
-        components_xml = component.Components()
-        components_xml.read(path)
-        #try:
-        for comp in components_xml.components:
-            self.components.append(comp)
-        #except:
-        #    raise Error(_('Component in %s is corrupt') % path)
-            #ctx.ui.error(str(Error(*errs)))
+        raise Exception
 
-    def add_distro(self, path):
-        ctx.ui.info("Adding distribution.xml to index...")
-        distro = component.Distribution()
-        #try:
-        distro.read(path)
-        self.distribution = distro
-        #except:
-        #    raise Error(_('Distribution in %s is corrupt') % path)
-            #ctx.ui.error(str(Error(*errs)))
+def add_groups(path):
+    ctx.ui.info("Adding groups.xml to index...")
+    groups_xml = group.Groups()
+    groups_xml.read(path)
+    return groups_xml.groups
 
-    def add_spec(self, path, repo_uri):
-        import pisi.operations.build
+def add_components(path):
+    ctx.ui.info("Adding components.xml to index...")
+    components_xml = component.Components()
+    components_xml.read(path)
+    #try:
+    return components_xml.components
+    #except:
+    #    raise Error(_('Component in %s is corrupt') % path)
+    #ctx.ui.error(str(Error(*errs)))
+
+def add_distro(path):
+    ctx.ui.info("Adding distribution.xml to index...")
+    distro = component.Distribution()
+    #try:
+    distro.read(path)
+    return distro
+    #except:
+    #    raise Error(_('Distribution in %s is corrupt') % path)
+    #ctx.ui.error(str(Error(*errs)))
+
+def add_spec(params):
+    try:
+        path , repo_uri = params
         ctx.ui.info(_('Adding %s to source index') % path)
         #TODO: may use try/except to handle this
         builder = pisi.operations.build.Builder(path)
-            #ctx.ui.error(_('SpecFile in %s is corrupt, skipping...') % path)
-            #ctx.ui.error(str(Error(*errs)))
+        #ctx.ui.error(_('SpecFile in %s is corrupt, skipping...') % path)
+        #ctx.ui.error(str(Error(*errs)))
         builder.fetch_component()
         sf = builder.spec
         if ctx.config.options and ctx.config.options.absolute_urls:
@@ -204,4 +255,8 @@
         else:                           # create relative path by default
             sf.source.sourceURI = util.removepathprefix(repo_uri, path)
             # check component
-        self.specs.append(sf)
+        return sf
+
+    except KeyboardInterrupt:
+        # Multiprocessing hack, see add_package method for explanation
+        raise Exception

Attachment: signature.asc
Description: This is a digitally signed message part.

_______________________________________________
Gelistirici mailing list
Gelistirici@pardus.org.tr
http://liste.pardus.org.tr/mailman/listinfo/gelistirici

Cevap