On Mon, 9 May 2011 02:46:09 +0300 Onur Küçük <o...@pardus.org.tr> wrote:
> > On Thu, 28 Apr 2011 16:48:54 +0000 > Gökçen Eraslan <gok...@pardus.org.tr> wrote: > > > Ozan'la birlikte geçenlerde yaptığımız girişimler sonucunda ekteki > > yama çıktı ortaya. Özetle normalde index.py'deki Index sınıfının > > index methodunda for döngüsü içinde her paket için tek tek yapılan > > pspec.xml/metadata.xml parse etme, sha1sum hesaplama işlemini > > Python'un harika multiprocessing modülünü kullanarak bir süreç > > havuzunda yaptırttık ve 2011 devel farmında indeksleme işlemi 1dk > > 56sn'den, 49sn'ye indi. Bu arada bugün bir değişiklik daha yapıp, > > kaynak paketlerin indekslenmesi için de aynı paralelleşmeyi > > sağladım. Kaynak depolar için de test edebilirsiniz. > > Pek leziz olmuş, elinize sağlık. multiprocessing yüzünden nesne > temelinden uzaklaştık ama bu değişiklik için bence değer, daha sonra > sakin bir kafayla belki bu işi biraz daha farklı yaparız. Ekte de 2009 'a uyarlanmış hali var. Sırf sha1sum hesaplanmasının paralelleşmesi bile bayağı bir performans kazandırıyor ( % 50-80 ), güzel oldu bu :) -- Onur Küçük Knowledge speaks, <onur.--.-.pardus.org.tr> but wisdom listens
diff -Nur pisi-2.3.2-old/pisi/index.py pisi-2.3.2/pisi/index.py --- pisi-2.3.2-old/pisi/index.py 2011-05-09 02:47:29.146619068 +0300 +++ pisi-2.3.2/pisi/index.py 2011-05-09 02:59:29.216629354 +0300 @@ -14,6 +14,7 @@ import os import shutil +import multiprocessing import gettext __trans = gettext.translation('pisi', fallback=True) @@ -30,6 +31,8 @@ import pisi.pxml.autoxml as autoxml import pisi.component as component import pisi.group as group +import pisi.operations.build + class Error(pisi.Error): pass @@ -85,7 +88,9 @@ self.repo_dir = repo_uri packages = [] + specs = [] deltas = {} + for root, dirs, files in os.walk(repo_uri): for fn in files: @@ -96,26 +101,61 @@ packages.append(os.path.join(root, fn)) if fn == 'components.xml': - self.add_components(os.path.join(root, fn)) + self.components.extend(add_components(os.path.join(root, fn))) if fn == 'pspec.xml' and not skip_sources: - self.add_spec(os.path.join(root, fn), repo_uri) + specs.append((os.path.join(root, fn), repo_uri)) if fn == 'distribution.xml': - self.add_distro(os.path.join(root, fn)) + self.distribution = add_distro(os.path.join(root, fn)) if fn == 'groups.xml': - self.add_groups(os.path.join(root, fn)) + self.groups.extend(add_groups(os.path.join(root, fn))) + + # Create a process pool, as many processes as the number of CPUs we have + pool = multiprocessing.Pool() + + try: + # Add source packages to index using a process pool + self.specs = pool.map(add_spec, specs) + except: + # If an exception occurs (like a keyboard interrupt), immediately terminate worker + # processes and propagate exception. (CLI honors KeyboardInterrupt exception, if you're + # not using CLI, you must handle KeyboardException yourself) + pool.terminate() + raise try: obsoletes_list = map(str, self.distribution.obsoletes) except AttributeError: obsoletes_list = [] + latest_packages = [] + for pkg in util.filter_latest_packages(packages): pkg_name = util.parse_package_name(os.path.basename(pkg))[0] if pkg_name not in obsoletes_list: - ctx.ui.info(_('Adding %s to package index') % pkg) - self.add_package(pkg, deltas, repo_uri) + # Currently, multiprocessing.Pool.map method accepts methods + # with single parameters only. So we have to send our parameters + # as a tuple to workaround that + + latest_packages.append((pkg, deltas, repo_uri)) + + print "Starting binary..." + try: + # Add binary packages to index using a process pool + self.packages = pool.map(add_package, latest_packages) + except: + pool.terminate() + raise + else: + # Clean up output + ctx.ui.info("\r%-80.80s" % (_('Done.'))) + print "Done" + +def add_package(params): + try: + path, deltas, repo_uri = params + + ctx.ui.info("\r%-80.80s" % (_('Adding package to index: %s') % os.path.basename(path)), noln = True) - def add_package(self, path, deltas, repo_uri): package = pisi.package.Package(path, 'r') md = package.get_metadata() md.package.packageSize = long(os.path.getsize(path)) @@ -149,43 +189,54 @@ if int(buildTo) == int(md.package.build): md.package.deltaPackages.append(delta) - self.packages.append(md.package) - - def add_groups(self, path): - ctx.ui.info("Adding groups.xml to index...") - groups_xml = group.Groups() - groups_xml.read(path) - for grp in groups_xml.groups: - self.groups.append(grp) - - def add_components(self, path): - ctx.ui.info("Adding components.xml to index...") - components_xml = component.Components() - components_xml.read(path) - #try: - for comp in components_xml.components: - self.components.append(comp) - #except: - # raise Error(_('Component in %s is corrupt') % path) - #ctx.ui.error(str(Error(*errs))) - - def add_distro(self, path): - ctx.ui.info("Adding distribution.xml to index...") - distro = component.Distribution() - #try: - distro.read(path) - self.distribution = distro - #except: - # raise Error(_('Distribution in %s is corrupt') % path) - #ctx.ui.error(str(Error(*errs))) + return md.package - def add_spec(self, path, repo_uri): - import pisi.operations.build + except KeyboardInterrupt: + # Handle KeyboardInterrupt exception to prevent ugly backtrace of all worker processes + # and propagate the exception to main process. + # + # Probably it's better to use just 'raise' here, but multiprocessing module has some bugs about that: + # (python#8296, python#9205 and python#9207 ) + # + # For now, worker processes do not propagate exceptions other than Exception (like KeyboardInterrupt), + # so we have to manually propagate KeyboardInterrupt exception as an Exception. + + raise Exception + +def add_groups(path): + ctx.ui.info("Adding groups.xml to index...") + groups_xml = group.Groups() + groups_xml.read(path) + return groups_xml.groups + +def add_components(path): + ctx.ui.info("Adding components.xml to index...") + components_xml = component.Components() + components_xml.read(path) + #try: + return components_xml.components + #except: + # raise Error(_('Component in %s is corrupt') % path) + #ctx.ui.error(str(Error(*errs))) + +def add_distro(path): + ctx.ui.info("Adding distribution.xml to index...") + distro = component.Distribution() + #try: + distro.read(path) + return distro + #except: + # raise Error(_('Distribution in %s is corrupt') % path) + #ctx.ui.error(str(Error(*errs))) + +def add_spec(params): + try: + path , repo_uri = params ctx.ui.info(_('Adding %s to source index') % path) #TODO: may use try/except to handle this builder = pisi.operations.build.Builder(path) - #ctx.ui.error(_('SpecFile in %s is corrupt, skipping...') % path) - #ctx.ui.error(str(Error(*errs))) + #ctx.ui.error(_('SpecFile in %s is corrupt, skipping...') % path) + #ctx.ui.error(str(Error(*errs))) builder.fetch_component() sf = builder.spec if ctx.config.options and ctx.config.options.absolute_urls: @@ -193,4 +244,8 @@ else: # create relative path by default sf.source.sourceURI = util.removepathprefix(repo_uri, path) # check component - self.specs.append(sf) + return sf + + except KeyboardInterrupt: + # Multiprocessing hack, see add_package method for explanation + raise Exception
_______________________________________________ Gelistirici mailing list Gelistirici@pardus.org.tr http://liste.pardus.org.tr/mailman/listinfo/gelistirici