09 Mayıs 2011 Pazartesi günü (saat 18:06:35) Fatih Aşıcı şunları yazmıştı:
>  On Thu, 28 Apr 2011 16:48:54 +0000, Gökçen Eraslan <gok...@pardus.org.tr>
> 
>  wrote:
> > Selamlar,
> 
>  Selamlar,
>

Selam,
 
> > Uzun zamandır aklımda olan bir şey pisi'nin index kodunu biraz
> > hızlandırmak.
> > Farmı her dürttüğümde hem ikili deponun hem debug deposunun
> > index'lenmesini
> > beklemek çok uzun sürüyor.
> > 
> > Index hızlandırma işi ne zamandır aklımdan çıkmış olsa da, geçenlerde
> > aklıma
> > gelen "Biz neden hala farmı elle çalıştırıyoruz, her paket commit
> > edildiğinde
> > otomatik derlemeye başlasa ya!?!" düşüncesini sesli dile getirdiğimde,
> > Onur
> > bunun için index'in de hızlandırılması gerektiğini söylediğinde tekrar
> > alevlendi. Zira derlenmesi üçer saniye süren üç paketi arka arkaya commit
> > ettiğimizde indexlerle birlikte totalde dakikalarca indexleme yapılmasını
> > beklemek akıl kârı değil.
> > 
> > Ozan'la birlikte geçenlerde yaptığımız girişimler sonucunda ekteki yama
> > çıktı
> > ortaya. Özetle normalde index.py'deki Index sınıfının index methodunda
> > for döngüsü içinde her paket için tek tek yapılan pspec.xml/metadata.xml
> > parse etme, sha1sum hesaplama işlemini Python'un harika multiprocessing
> > modülünü kullanarak bir süreç havuzunda yaptırttık ve 2011 devel
> > farmında indeksleme
> > işlemi 1dk 56sn'den, 49sn'ye indi. Bu arada bugün bir değişiklik daha
> > yapıp,
> > kaynak paketlerin indekslenmesi için de aynı paralelleşmeyi sağladım.
> > Kaynak
> > depolar için de test edebilirsiniz.
> 
>  Elinize sağlık. Arada print'li satırlar kalmış. Bunları ctx.ui kullanacak
>  hale getirmelisiniz eğer gerekliyse. Bir de bazı satırlar (özellikle yorum
>  satırları) 79 sütun sınırını geçmiş. Bunları da kırpalım.
> 

Bunları hallettim sanırım, güncel yamayı ekte gönderiyorum.

PS: Bunun için "set cc=80" kullandım vim'de, baya güzel oluyormuş. Bir de, 
'gq' kombinasyonu da otomatik 79'a indiriyor seçili bölümü. Bir de "set tw=80" 
var, tabi.

> > Ortaya çıkan beğenmediğim birkaç husus var:
> > 
> > 1- index.py nesne temelli tasarımdan biraz uzaklaşmış oldu. Bunun nedeni
> > multiprocessing methodlarının sınıf methodları üzerinde çalışmıyor oluşu.
> > Sınıf methodları değil muhakkak normal fonksiyonlar vermek gerekiyor
> > multiprocessing modülüne paralelleştirme için.
> 
>  Sorun değil. Pisi'deki her modül nesne temelli tasarlanmamış zaten.
> 
> > 2- Pool.map fonksiyonu sadece tek parametre alan fonksiyonlarda çalıştığı
> > için, add_* fonksiyonlarının parametre sayısını bire indirmek zorunda
> > kaldık.
> 
>  Bunun daha şık bir yolu vardır belki. Şimdilik böyle olsun.
> 
> > 3- Klavye kesmelerini yakalamak için küçük bir hack yapmak gerekiyor,
> > detaylar
> > yamanın içinde yazıyor yorum olarak.  Bununla ilgili python'da 3 tane
> > açık hata var, yamada yazdım neler olduğunu. Bakıp yorumlayabilirsiniz.
> > 
> > 4- Nadiren, Control+C'ye iki kez basmak gerekebiliyor indeks işlemini
> > iptal
> > etmek için. Üstteki durumdan kaynaklanıyor bu da.
> > 
> > Yamaya bir göz atın, deneyin, test edin, performansını ölçün.
> > 
> > Ben bu üsttekilerin engelleyici sorunlar olduğunu düşünmüyorum yamayı
> > içeri
> > almak için ame yine de burada tartışalım.
> 
>  +1. Benden OK.
>

Commit ediyorum bu halini o zaman. Pakete alayım mı peki, pisi release 
edebilecek misin? Ya da biz edelim mi? Nasıl yapalım?
 
> > Not: İndeks çıktısında daha önce indekslenen tüm paketler yeni satırla
> > bölünerek destan gibi ekrana basılıyordu. Yamada bunu da değiştirdim,
> > "\r%-80.80s" şeklinde tek satırda yazdırıyorum artık tüm paket
> > isimlerini.
> 
>  Güzel olmuş. Ben de index'in gereksiz yere uzun bir çıktı verdiğini
>  düşünüyordum.
> 
> _______________________________________________
> Gelistirici mailing list
> Gelistirici@pardus.org.tr
> http://liste.pardus.org.tr/mailman/listinfo/gelistirici

-- 
Gökçen Eraslan
Index: pisi/index.py
===================================================================
--- pisi/index.py	(revision 36844)
+++ pisi/index.py	(working copy)
@@ -14,6 +14,7 @@
 
 import os
 import shutil
+import multiprocessing
 
 import gettext
 __trans = gettext.translation('pisi', fallback=True)
@@ -30,7 +31,9 @@
 import pisi.pxml.autoxml as autoxml
 import pisi.component as component
 import pisi.group as group
+import pisi.operations.build
 
+
 class Error(pisi.Error):
     pass
 
@@ -85,7 +88,9 @@
         self.repo_dir = repo_uri
 
         packages = []
+        specs = []
         deltas = {}
+
         for root, dirs, files in os.walk(repo_uri):
             for fn in files:
 
@@ -96,29 +101,66 @@
                     packages.append(os.path.join(root, fn))
 
                 if fn == 'components.xml':
-                    self.add_components(os.path.join(root, fn))
+                    self.components.extend(add_components(os.path.join(root, fn)))
                 if fn == 'pspec.xml' and not skip_sources:
-                    self.add_spec(os.path.join(root, fn), repo_uri)
+                    specs.append((os.path.join(root, fn), repo_uri))
                 if fn == 'distribution.xml':
-                    self.add_distro(os.path.join(root, fn))
+                    self.distribution = add_distro(os.path.join(root, fn))
                 if fn == 'groups.xml':
-                    self.add_groups(os.path.join(root, fn))
+                    self.groups.extend(add_groups(os.path.join(root, fn)))
 
+        # Create a process pool, as many processes as the number of CPUs we
+        # have
+        pool = multiprocessing.Pool()
+
         try:
+            # Add source packages to index using a process pool
+            self.specs = pool.map(add_spec, specs)
+        except:
+            # If an exception occurs (like a keyboard interrupt), immediately
+            # terminate worker processes and propagate exception. (CLI honors
+            # KeyboardInterrupt exception, if you're not using CLI, you must
+            # handle KeyboardException yourself)
+
+            pool.terminate()
+            raise
+
+        try:
             obsoletes_list = map(str, self.distribution.obsoletes)
         except AttributeError:
             obsoletes_list = []
 
+        latest_packages = []
+
         for pkg in util.filter_latest_packages(packages):
             pkg_name = util.parse_package_name(os.path.basename(pkg))[0]
             if pkg_name.endswith(ctx.const.debug_name_suffix):
                 pkg_name = util.remove_suffix(ctx.const.debug_name_suffix,
                                               pkg_name)
             if pkg_name not in obsoletes_list:
-                ctx.ui.info(_('Adding %s to package index') % pkg)
-                self.add_package(pkg, deltas, repo_uri)
+                # Currently, multiprocessing.Pool.map method accepts methods
+                # with single parameters only. So we have to send our
+                # parameters as a tuple to workaround that
 
-    def add_package(self, path, deltas, repo_uri):
+                latest_packages.append((pkg, deltas, repo_uri))
+
+        try:
+            # Add binary packages to index using a process pool
+            self.packages = pool.map(add_package, latest_packages)
+        except:
+            pool.terminate()
+            raise
+        else:
+            # Clean up output
+            ctx.ui.info("\r%-80.80s" % (_('Done.')))
+
+def add_package(params):
+    try:
+        path, deltas, repo_uri = params
+
+        ctx.ui.info("\r%-80.80s" % (_('Adding package to index: %s') % 
+            os.path.basename(path)), noln = True)
+
         package = pisi.package.Package(path, 'r')
         md = package.get_metadata()
         md.package.packageSize = long(os.path.getsize(path))
@@ -160,43 +202,56 @@
 
                     md.package.deltaPackages.append(delta)
 
-            self.packages.append(md.package)
+        return md.package
 
-    def add_groups(self, path):
-        ctx.ui.info("Adding groups.xml to index...")
-        groups_xml = group.Groups()
-        groups_xml.read(path)
-        for grp in groups_xml.groups:
-            self.groups.append(grp)
+    except KeyboardInterrupt:
+        # Handle KeyboardInterrupt exception to prevent ugly backtrace of all
+        # worker processes and propagate the exception to main process.
+        #
+        # Probably it's better to use just 'raise' here, but multiprocessing
+        # module has some bugs about that: (python#8296, python#9205 and
+        # python#9207 )
+        #
+        # For now, worker processes do not propagate exceptions other than
+        # Exception (like KeyboardInterrupt), so we have to manually propagate
+        # KeyboardInterrupt exception as an Exception.
 
-    def add_components(self, path):
-        ctx.ui.info("Adding components.xml to index...")
-        components_xml = component.Components()
-        components_xml.read(path)
-        #try:
-        for comp in components_xml.components:
-            self.components.append(comp)
-        #except:
-        #    raise Error(_('Component in %s is corrupt') % path)
-            #ctx.ui.error(str(Error(*errs)))
+        raise Exception
 
-    def add_distro(self, path):
-        ctx.ui.info("Adding distribution.xml to index...")
-        distro = component.Distribution()
-        #try:
-        distro.read(path)
-        self.distribution = distro
-        #except:
-        #    raise Error(_('Distribution in %s is corrupt') % path)
-            #ctx.ui.error(str(Error(*errs)))
+def add_groups(path):
+    ctx.ui.info("Adding groups.xml to index...")
+    groups_xml = group.Groups()
+    groups_xml.read(path)
+    return groups_xml.groups
 
-    def add_spec(self, path, repo_uri):
-        import pisi.operations.build
+def add_components(path):
+    ctx.ui.info("Adding components.xml to index...")
+    components_xml = component.Components()
+    components_xml.read(path)
+    #try:
+    return components_xml.components
+    #except:
+    #    raise Error(_('Component in %s is corrupt') % path)
+    #ctx.ui.error(str(Error(*errs)))
+
+def add_distro(path):
+    ctx.ui.info("Adding distribution.xml to index...")
+    distro = component.Distribution()
+    #try:
+    distro.read(path)
+    return distro
+    #except:
+    #    raise Error(_('Distribution in %s is corrupt') % path)
+    #ctx.ui.error(str(Error(*errs)))
+
+def add_spec(params):
+    try:
+        path , repo_uri = params
         ctx.ui.info(_('Adding %s to source index') % path)
         #TODO: may use try/except to handle this
         builder = pisi.operations.build.Builder(path)
-            #ctx.ui.error(_('SpecFile in %s is corrupt, skipping...') % path)
-            #ctx.ui.error(str(Error(*errs)))
+        #ctx.ui.error(_('SpecFile in %s is corrupt, skipping...') % path)
+        #ctx.ui.error(str(Error(*errs)))
         builder.fetch_component()
         sf = builder.spec
         if ctx.config.options and ctx.config.options.absolute_urls:
@@ -204,4 +259,8 @@
         else:                           # create relative path by default
             sf.source.sourceURI = util.removepathprefix(repo_uri, path)
             # check component
-        self.specs.append(sf)
+        return sf
+
+    except KeyboardInterrupt:
+        # Multiprocessing hack, see add_package method for explanation
+        raise Exception

Attachment: signature.asc
Description: This is a digitally signed message part.

_______________________________________________
Gelistirici mailing list
Gelistirici@pardus.org.tr
http://liste.pardus.org.tr/mailman/listinfo/gelistirici

Cevap