09 Mayıs 2011 Pazartesi günü (saat 18:06:35) Fatih Aşıcı şunları yazmıştı: > On Thu, 28 Apr 2011 16:48:54 +0000, Gökçen Eraslan <gok...@pardus.org.tr> > > wrote: > > Selamlar, > > Selamlar, >
Selam, > > Uzun zamandır aklımda olan bir şey pisi'nin index kodunu biraz > > hızlandırmak. > > Farmı her dürttüğümde hem ikili deponun hem debug deposunun > > index'lenmesini > > beklemek çok uzun sürüyor. > > > > Index hızlandırma işi ne zamandır aklımdan çıkmış olsa da, geçenlerde > > aklıma > > gelen "Biz neden hala farmı elle çalıştırıyoruz, her paket commit > > edildiğinde > > otomatik derlemeye başlasa ya!?!" düşüncesini sesli dile getirdiğimde, > > Onur > > bunun için index'in de hızlandırılması gerektiğini söylediğinde tekrar > > alevlendi. Zira derlenmesi üçer saniye süren üç paketi arka arkaya commit > > ettiğimizde indexlerle birlikte totalde dakikalarca indexleme yapılmasını > > beklemek akıl kârı değil. > > > > Ozan'la birlikte geçenlerde yaptığımız girişimler sonucunda ekteki yama > > çıktı > > ortaya. Özetle normalde index.py'deki Index sınıfının index methodunda > > for döngüsü içinde her paket için tek tek yapılan pspec.xml/metadata.xml > > parse etme, sha1sum hesaplama işlemini Python'un harika multiprocessing > > modülünü kullanarak bir süreç havuzunda yaptırttık ve 2011 devel > > farmında indeksleme > > işlemi 1dk 56sn'den, 49sn'ye indi. Bu arada bugün bir değişiklik daha > > yapıp, > > kaynak paketlerin indekslenmesi için de aynı paralelleşmeyi sağladım. > > Kaynak > > depolar için de test edebilirsiniz. > > Elinize sağlık. Arada print'li satırlar kalmış. Bunları ctx.ui kullanacak > hale getirmelisiniz eğer gerekliyse. Bir de bazı satırlar (özellikle yorum > satırları) 79 sütun sınırını geçmiş. Bunları da kırpalım. > Bunları hallettim sanırım, güncel yamayı ekte gönderiyorum. PS: Bunun için "set cc=80" kullandım vim'de, baya güzel oluyormuş. Bir de, 'gq' kombinasyonu da otomatik 79'a indiriyor seçili bölümü. Bir de "set tw=80" var, tabi. > > Ortaya çıkan beğenmediğim birkaç husus var: > > > > 1- index.py nesne temelli tasarımdan biraz uzaklaşmış oldu. Bunun nedeni > > multiprocessing methodlarının sınıf methodları üzerinde çalışmıyor oluşu. > > Sınıf methodları değil muhakkak normal fonksiyonlar vermek gerekiyor > > multiprocessing modülüne paralelleştirme için. > > Sorun değil. Pisi'deki her modül nesne temelli tasarlanmamış zaten. > > > 2- Pool.map fonksiyonu sadece tek parametre alan fonksiyonlarda çalıştığı > > için, add_* fonksiyonlarının parametre sayısını bire indirmek zorunda > > kaldık. > > Bunun daha şık bir yolu vardır belki. Şimdilik böyle olsun. > > > 3- Klavye kesmelerini yakalamak için küçük bir hack yapmak gerekiyor, > > detaylar > > yamanın içinde yazıyor yorum olarak. Bununla ilgili python'da 3 tane > > açık hata var, yamada yazdım neler olduğunu. Bakıp yorumlayabilirsiniz. > > > > 4- Nadiren, Control+C'ye iki kez basmak gerekebiliyor indeks işlemini > > iptal > > etmek için. Üstteki durumdan kaynaklanıyor bu da. > > > > Yamaya bir göz atın, deneyin, test edin, performansını ölçün. > > > > Ben bu üsttekilerin engelleyici sorunlar olduğunu düşünmüyorum yamayı > > içeri > > almak için ame yine de burada tartışalım. > > +1. Benden OK. > Commit ediyorum bu halini o zaman. Pakete alayım mı peki, pisi release edebilecek misin? Ya da biz edelim mi? Nasıl yapalım? > > Not: İndeks çıktısında daha önce indekslenen tüm paketler yeni satırla > > bölünerek destan gibi ekrana basılıyordu. Yamada bunu da değiştirdim, > > "\r%-80.80s" şeklinde tek satırda yazdırıyorum artık tüm paket > > isimlerini. > > Güzel olmuş. Ben de index'in gereksiz yere uzun bir çıktı verdiğini > düşünüyordum. > > _______________________________________________ > Gelistirici mailing list > Gelistirici@pardus.org.tr > http://liste.pardus.org.tr/mailman/listinfo/gelistirici -- Gökçen Eraslan
Index: pisi/index.py =================================================================== --- pisi/index.py (revision 36844) +++ pisi/index.py (working copy) @@ -14,6 +14,7 @@ import os import shutil +import multiprocessing import gettext __trans = gettext.translation('pisi', fallback=True) @@ -30,7 +31,9 @@ import pisi.pxml.autoxml as autoxml import pisi.component as component import pisi.group as group +import pisi.operations.build + class Error(pisi.Error): pass @@ -85,7 +88,9 @@ self.repo_dir = repo_uri packages = [] + specs = [] deltas = {} + for root, dirs, files in os.walk(repo_uri): for fn in files: @@ -96,29 +101,66 @@ packages.append(os.path.join(root, fn)) if fn == 'components.xml': - self.add_components(os.path.join(root, fn)) + self.components.extend(add_components(os.path.join(root, fn))) if fn == 'pspec.xml' and not skip_sources: - self.add_spec(os.path.join(root, fn), repo_uri) + specs.append((os.path.join(root, fn), repo_uri)) if fn == 'distribution.xml': - self.add_distro(os.path.join(root, fn)) + self.distribution = add_distro(os.path.join(root, fn)) if fn == 'groups.xml': - self.add_groups(os.path.join(root, fn)) + self.groups.extend(add_groups(os.path.join(root, fn))) + # Create a process pool, as many processes as the number of CPUs we + # have + pool = multiprocessing.Pool() + try: + # Add source packages to index using a process pool + self.specs = pool.map(add_spec, specs) + except: + # If an exception occurs (like a keyboard interrupt), immediately + # terminate worker processes and propagate exception. (CLI honors + # KeyboardInterrupt exception, if you're not using CLI, you must + # handle KeyboardException yourself) + + pool.terminate() + raise + + try: obsoletes_list = map(str, self.distribution.obsoletes) except AttributeError: obsoletes_list = [] + latest_packages = [] + for pkg in util.filter_latest_packages(packages): pkg_name = util.parse_package_name(os.path.basename(pkg))[0] if pkg_name.endswith(ctx.const.debug_name_suffix): pkg_name = util.remove_suffix(ctx.const.debug_name_suffix, pkg_name) if pkg_name not in obsoletes_list: - ctx.ui.info(_('Adding %s to package index') % pkg) - self.add_package(pkg, deltas, repo_uri) + # Currently, multiprocessing.Pool.map method accepts methods + # with single parameters only. So we have to send our + # parameters as a tuple to workaround that - def add_package(self, path, deltas, repo_uri): + latest_packages.append((pkg, deltas, repo_uri)) + + try: + # Add binary packages to index using a process pool + self.packages = pool.map(add_package, latest_packages) + except: + pool.terminate() + raise + else: + # Clean up output + ctx.ui.info("\r%-80.80s" % (_('Done.'))) + +def add_package(params): + try: + path, deltas, repo_uri = params + + ctx.ui.info("\r%-80.80s" % (_('Adding package to index: %s') % + os.path.basename(path)), noln = True) + package = pisi.package.Package(path, 'r') md = package.get_metadata() md.package.packageSize = long(os.path.getsize(path)) @@ -160,43 +202,56 @@ md.package.deltaPackages.append(delta) - self.packages.append(md.package) + return md.package - def add_groups(self, path): - ctx.ui.info("Adding groups.xml to index...") - groups_xml = group.Groups() - groups_xml.read(path) - for grp in groups_xml.groups: - self.groups.append(grp) + except KeyboardInterrupt: + # Handle KeyboardInterrupt exception to prevent ugly backtrace of all + # worker processes and propagate the exception to main process. + # + # Probably it's better to use just 'raise' here, but multiprocessing + # module has some bugs about that: (python#8296, python#9205 and + # python#9207 ) + # + # For now, worker processes do not propagate exceptions other than + # Exception (like KeyboardInterrupt), so we have to manually propagate + # KeyboardInterrupt exception as an Exception. - def add_components(self, path): - ctx.ui.info("Adding components.xml to index...") - components_xml = component.Components() - components_xml.read(path) - #try: - for comp in components_xml.components: - self.components.append(comp) - #except: - # raise Error(_('Component in %s is corrupt') % path) - #ctx.ui.error(str(Error(*errs))) + raise Exception - def add_distro(self, path): - ctx.ui.info("Adding distribution.xml to index...") - distro = component.Distribution() - #try: - distro.read(path) - self.distribution = distro - #except: - # raise Error(_('Distribution in %s is corrupt') % path) - #ctx.ui.error(str(Error(*errs))) +def add_groups(path): + ctx.ui.info("Adding groups.xml to index...") + groups_xml = group.Groups() + groups_xml.read(path) + return groups_xml.groups - def add_spec(self, path, repo_uri): - import pisi.operations.build +def add_components(path): + ctx.ui.info("Adding components.xml to index...") + components_xml = component.Components() + components_xml.read(path) + #try: + return components_xml.components + #except: + # raise Error(_('Component in %s is corrupt') % path) + #ctx.ui.error(str(Error(*errs))) + +def add_distro(path): + ctx.ui.info("Adding distribution.xml to index...") + distro = component.Distribution() + #try: + distro.read(path) + return distro + #except: + # raise Error(_('Distribution in %s is corrupt') % path) + #ctx.ui.error(str(Error(*errs))) + +def add_spec(params): + try: + path , repo_uri = params ctx.ui.info(_('Adding %s to source index') % path) #TODO: may use try/except to handle this builder = pisi.operations.build.Builder(path) - #ctx.ui.error(_('SpecFile in %s is corrupt, skipping...') % path) - #ctx.ui.error(str(Error(*errs))) + #ctx.ui.error(_('SpecFile in %s is corrupt, skipping...') % path) + #ctx.ui.error(str(Error(*errs))) builder.fetch_component() sf = builder.spec if ctx.config.options and ctx.config.options.absolute_urls: @@ -204,4 +259,8 @@ else: # create relative path by default sf.source.sourceURI = util.removepathprefix(repo_uri, path) # check component - self.specs.append(sf) + return sf + + except KeyboardInterrupt: + # Multiprocessing hack, see add_package method for explanation + raise Exception
signature.asc
Description: This is a digitally signed message part.
_______________________________________________ Gelistirici mailing list Gelistirici@pardus.org.tr http://liste.pardus.org.tr/mailman/listinfo/gelistirici