This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit ca909d9c41f074858dd4d24d9b3cc226b7a1c3f8
Author: Jürg Billeter <[email protected]>
AuthorDate: Mon Dec 14 19:43:40 2020 +0100

    Move artifact and source cache query to a job thread
    
    This allows parallelization of cache queries.
---
 src/buildstream/_elementsources.py                 |  5 +-
 src/buildstream/_frontend/cli.py                   |  2 +
 src/buildstream/_loader/loader.py                  |  1 +
 src/buildstream/_scheduler/__init__.py             |  1 +
 .../_scheduler/queues/cachequeryqueue.py           | 66 ++++++++++++++++++++++
 src/buildstream/_scheduler/queues/fetchqueue.py    |  2 +-
 src/buildstream/_scheduler/queues/pullqueue.py     |  4 --
 src/buildstream/_stream.py                         | 44 +++++++++++++++
 src/buildstream/element.py                         | 10 +---
 tests/artifactcache/push.py                        |  3 +
 tests/frontend/fetch.py                            |  8 +--
 tests/frontend/track.py                            | 12 ++--
 tests/sourcecache/fetch.py                         |  6 ++
 tests/sourcecache/push.py                          |  2 +
 tests/sourcecache/staging.py                       |  3 +
 tests/sources/git.py                               |  6 +-
 16 files changed, 151 insertions(+), 24 deletions(-)

diff --git a/src/buildstream/_elementsources.py 
b/src/buildstream/_elementsources.py
index 9b4afe4..d426ee6 100644
--- a/src/buildstream/_elementsources.py
+++ b/src/buildstream/_elementsources.py
@@ -211,6 +211,9 @@ class ElementSources:
     #    SourceError: If one of the element sources has an error
     #
     def fetch(self):
+        if self._cached is None:
+            self.query_cache()
+
         if self.cached():
             return
 
@@ -387,8 +390,6 @@ class ElementSources:
         unique_key = self.get_unique_key()
         self._cache_key = _cachekey.generate_key(unique_key)
 
-        self.query_cache()
-
     # preflight():
     #
     # A internal wrapper for calling the abstract preflight() method on
diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py
index ab06e8a..cad1ee0 100644
--- a/src/buildstream/_frontend/cli.py
+++ b/src/buildstream/_frontend/cli.py
@@ -551,6 +551,8 @@ def show(app, elements, deps, except_, order, format_):
 
         dependencies = app.stream.load_selection(elements, selection=deps, 
except_targets=except_)
 
+        app.stream.query_cache(dependencies)
+
         if order == "alpha":
             dependencies = sorted(dependencies)
 
diff --git a/src/buildstream/_loader/loader.py 
b/src/buildstream/_loader/loader.py
index 6ace362..3d0fb65 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -830,6 +830,7 @@ class Loader:
 
         # Handle the case where a subproject needs to be fetched
         #
+        element._query_source_cache()
         if element._should_fetch():
             self.load_context.fetch_subprojects([element])
 
diff --git a/src/buildstream/_scheduler/__init__.py 
b/src/buildstream/_scheduler/__init__.py
index d2f458f..fcde00d 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -25,6 +25,7 @@ from .queues.trackqueue import TrackQueue
 from .queues.buildqueue import BuildQueue
 from .queues.artifactpushqueue import ArtifactPushQueue
 from .queues.pullqueue import PullQueue
+from .queues.cachequeryqueue import CacheQueryQueue
 
 from .scheduler import Scheduler, SchedStatus
 from .jobs import ElementJob, JobStatus
diff --git a/src/buildstream/_scheduler/queues/cachequeryqueue.py 
b/src/buildstream/_scheduler/queues/cachequeryqueue.py
new file mode 100644
index 0000000..f447ab5
--- /dev/null
+++ b/src/buildstream/_scheduler/queues/cachequeryqueue.py
@@ -0,0 +1,66 @@
+#
+#  Copyright (C) 2020 Bloomberg Finance LP
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.         See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from . import Queue, QueueStatus
+from ..resources import ResourceType
+from ..jobs import JobStatus
+from ...types import _KeyStrength
+
+
+# A queue which queries the cache for artifacts and sources
+#
+class CacheQueryQueue(Queue):
+
+    action_name = "Cache-query"
+    complete_name = "Cache queried"
+    resources = [ResourceType.PROCESS, ResourceType.CACHE]
+
+    def __init__(self, scheduler, *, sources=False):
+        super().__init__(scheduler)
+
+        self._query_sources = sources
+
+    def get_process_func(self):
+        if not self._query_sources:
+            return CacheQueryQueue._query_artifacts_or_sources
+        else:
+            return CacheQueryQueue._query_sources
+
+    def status(self, element):
+        if not element._get_cache_key(strength=_KeyStrength.WEAK):
+            # Strict and weak cache keys are unavailable if the element or
+            # a dependency has an unresolved source
+            return QueueStatus.SKIP
+
+        return QueueStatus.READY
+
+    def done(self, _, element, result, status):
+        if status is JobStatus.FAIL:
+            return
+
+        if not self._query_sources:
+            if not element._pull_pending():
+                element._load_artifact_done()
+
+    @staticmethod
+    def _query_artifacts_or_sources(element):
+        element._load_artifact(pull=False)
+        if not element._can_query_cache() or not element._cached_success():
+            element._query_source_cache()
+
+    @staticmethod
+    def _query_sources(element):
+        element._query_source_cache()
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py 
b/src/buildstream/_scheduler/queues/fetchqueue.py
index 3a4183d..ee84982 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -56,7 +56,7 @@ class FetchQueue(Queue):
         # This will automatically skip elements which
         # have no sources.
 
-        if not element._should_fetch(self._should_fetch_original):
+        if element._can_query_source_cache() and not 
element._should_fetch(self._should_fetch_original):
             return QueueStatus.SKIP
 
         return QueueStatus.READY
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py 
b/src/buildstream/_scheduler/queues/pullqueue.py
index ecff02c..9860256 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -43,10 +43,6 @@ class PullQueue(Queue):
             return QueueStatus.SKIP
 
     def done(self, _, element, result, status):
-
-        if status is JobStatus.FAIL:
-            return
-
         element._load_artifact_done()
 
     @staticmethod
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e05100f..0558a12 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -37,6 +37,7 @@ from ._scheduler import (
     Scheduler,
     SchedStatus,
     TrackQueue,
+    CacheQueryQueue,
     FetchQueue,
     SourcePushQueue,
     BuildQueue,
@@ -162,6 +163,25 @@ class Stream:
 
             return target_objects
 
+    # query_cache()
+    #
+    # Query the artifact and source caches to determine the cache status
+    # of the specified elements.
+    #
+    # Args:
+    #    elements (list of Element): The elements to check
+    #
+    def query_cache(self, elements, *, sources=False):
+        with self._context.messenger.timed_activity("Query cache", 
silent_nested=True):
+            # Enqueue complete build plan as this is required to determine 
`buildable` status.
+            plan = list(self._pipeline.dependencies(elements, _Scope.ALL))
+
+            self._scheduler.clear_queues()
+            self._add_queue(CacheQueryQueue(self._scheduler, sources=sources), 
track=True)
+            self._enqueue_plan(plan)
+            self._run()
+            self._scheduler.clear_queues()
+
     # shell()
     #
     # Run a shell
@@ -208,6 +228,8 @@ class Stream:
             element = self.targets[0]
             element._set_required(scope)
 
+            self.query_cache([element] + elements)
+
             if pull_:
                 self._scheduler.clear_queues()
                 self._add_queue(PullQueue(self._scheduler))
@@ -244,6 +266,7 @@ class Stream:
 
         # Ensure we have our sources if we are launching a build shell
         if scope == _Scope.BUILD and not usebuildtree:
+            self.query_cache([element], sources=True)
             self._fetch([element])
             self._pipeline.assert_sources_cached([element])
 
@@ -294,6 +317,8 @@ class Stream:
                 for element in self.targets:
                     element._set_artifact_files_required(scope=scope)
 
+        self.query_cache(elements)
+
         # Now construct the queues
         #
         self._scheduler.clear_queues()
@@ -339,6 +364,8 @@ class Stream:
             source_remote_url=remote,
         )
 
+        self.query_cache(elements, sources=True)
+
         # Delegated to a shared fetch method
         self._fetch(elements, announce_session=True)
 
@@ -403,6 +430,8 @@ class Stream:
             load_artifacts=True,
         )
 
+        self.query_cache(elements, sources=True)
+
         if not self._sourcecache.has_push_remotes():
             raise StreamError("No source caches available for pushing sources")
 
@@ -447,6 +476,9 @@ class Stream:
             raise StreamError("No artifact caches available for pulling 
artifacts")
 
         self._pipeline.assert_consistent(elements)
+
+        self.query_cache(elements)
+
         self._scheduler.clear_queues()
         self._add_queue(PullQueue(self._scheduler))
         self._enqueue_plan(elements)
@@ -489,6 +521,8 @@ class Stream:
 
         self._pipeline.assert_consistent(elements)
 
+        self.query_cache(elements)
+
         self._scheduler.clear_queues()
         self._add_queue(PullQueue(self._scheduler))
         self._add_queue(ArtifactPushQueue(self._scheduler))
@@ -539,6 +573,8 @@ class Stream:
 
         self._check_location_writable(location, force=force, tar=tar)
 
+        self.query_cache(elements)
+
         uncached_elts = [elt for elt in elements if not elt._cached()]
         if uncached_elts and pull:
             self._context.messenger.info("Attempting to fetch missing or 
incomplete artifact")
@@ -617,6 +653,8 @@ class Stream:
             targets, selection=selection, use_artifact_config=True, 
load_artifacts=True
         )
 
+        self.query_cache(target_objects)
+
         if self._artifacts.has_fetch_remotes():
             self._pipeline.check_remotes(target_objects)
 
@@ -636,6 +674,8 @@ class Stream:
         # Return list of Element and/or ArtifactElement objects
         target_objects = self.load_selection(targets, 
selection=_PipelineSelection.NONE, load_artifacts=True)
 
+        self.query_cache(target_objects)
+
         artifact_logs = {}
         for obj in target_objects:
             ref = obj.get_artifact_name()
@@ -664,6 +704,8 @@ class Stream:
         # Return list of Element and/or ArtifactElement objects
         target_objects = self.load_selection(targets, 
selection=_PipelineSelection.NONE, load_artifacts=True)
 
+        self.query_cache(target_objects)
+
         elements_to_files = {}
         for obj in target_objects:
             ref = obj.get_artifact_name()
@@ -742,6 +784,7 @@ class Stream:
         elements = self._load((target,), selection=deps, 
except_targets=except_targets)
 
         # Assert all sources are cached in the source dir
+        self.query_cache(elements, sources=True)
         self._fetch(elements)
         self._pipeline.assert_sources_cached(elements)
 
@@ -775,6 +818,7 @@ class Stream:
         # If we're going to checkout, we need at least a fetch,
         #
         if not no_checkout:
+            self.query_cache(elements, sources=True)
             self._fetch(elements, fetch_original=True)
 
         expanded_directories = []
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index f5f8a71..7dcbc32 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1945,6 +1945,9 @@ class Element(Plugin):
         self.__artifact = artifact
         return pulled
 
+    def _query_source_cache(self):
+        self.__sources.query_cache()
+
     def _skip_source_push(self):
         if not self.sources() or self._get_workspace():
             return True
@@ -3256,13 +3259,6 @@ class Element(Plugin):
             # In strict mode, the strong cache key always matches the strict 
cache key
             self.__cache_key = self.__strict_cache_key
 
-        # If we've newly calculated a cache key, our artifact's
-        # current state will also change - after all, we can now find
-        # a potential existing artifact.
-        self._load_artifact(pull=False)
-        if not self._pull_pending():
-            self._load_artifact_done()
-
         # Update the message kwargs in use for this plugin to dispatch 
messages with
         #
         self._message_kwargs["element_key"] = self._get_display_key()
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 74062ce..17ad2e2 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -43,6 +43,9 @@ def _push(cli, cache_dir, project_dir, config_file, target):
         artifactcache.setup_remotes(use_config=True)
         artifactcache.initialize_remotes()
 
+        # Query local cache
+        element._load_artifact(pull=False)
+
         assert artifactcache.has_push_remotes(plugin=element), "No remote 
configured for element target.bst"
         assert element._push(), "Push operation failed"
 
diff --git a/tests/frontend/fetch.py b/tests/frontend/fetch.py
index b2c9d64..6c8a4b7 100644
--- a/tests/frontend/fetch.py
+++ b/tests/frontend/fetch.py
@@ -62,10 +62,9 @@ def test_fetch_deps(cli, datafiles, deps, expected_states):
 def test_fetch_consistency_error(cli, datafiles):
     project = str(datafiles)
 
-    # When the error occurs outside of the scheduler at load time,
-    # then the SourceError is reported directly as the main error.
     result = cli.run(project=project, args=["source", "fetch", "error.bst"])
-    result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+    result.assert_main_error(ErrorDomain.STREAM, None)
+    result.assert_task_error(ErrorDomain.SOURCE, "the-consistency-error")
 
 
 @pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
@@ -73,7 +72,8 @@ def test_fetch_consistency_bug(cli, datafiles):
     project = str(datafiles)
 
     result = cli.run(project=project, args=["source", "fetch", "bug.bst"])
-    result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+    result.assert_main_error(ErrorDomain.STREAM, None)
+    result.assert_task_error(ErrorDomain.PLUGIN, "source-bug")
 
 
 @pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/frontend/track.py b/tests/frontend/track.py
index bd84449..d1a9324 100644
--- a/tests/frontend/track.py
+++ b/tests/frontend/track.py
@@ -248,20 +248,22 @@ def test_track_cross_junction(cli, tmpdir, datafiles, 
cross_junction, ref_storag
 def test_track_consistency_error(cli, datafiles):
     project = str(datafiles)
 
-    # Track the element causing a consistency error
+    # Track the element causing a consistency error in `is_cached()`
     result = cli.run(project=project, args=["source", "track", "error.bst"])
-    result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+
+    # We expect tracking to succeed as `is_cached()` is not required for 
tracking.
+    result.assert_success()
 
 
 @pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
 def test_track_consistency_bug(cli, datafiles):
     project = str(datafiles)
 
-    # Track the element causing an unhandled exception
+    # Track the element causing an unhandled exception in `is_cached()`
     result = cli.run(project=project, args=["source", "track", "bug.bst"])
 
-    # We expect BuildStream to fail gracefully, with no recorded exception.
-    result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+    # We expect tracking to succeed as `is_cached()` is not required for 
tracking.
+    result.assert_success()
 
 
 @pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index 76f5508..40076e4 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -75,6 +75,7 @@ def test_source_fetch(cli, tmpdir, datafiles):
 
             element = project.load_elements([element_name])[0]
             element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -117,6 +118,7 @@ def test_source_fetch(cli, tmpdir, datafiles):
             element._initialize_state()
 
             # check that we have the source in the cas now and it's not fetched
+            element._query_source_cache()
             assert element._cached_sources()
             assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", 
"git")) == []
 
@@ -135,6 +137,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
 
             element = project.load_elements([element_name])[0]
             element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -153,6 +156,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
             # Check that the source in both in the source dir and the local CAS
             element = project.load_elements([element_name])[0]
             element._initialize_state()
+            element._query_source_cache()
             assert element._cached_sources()
 
 
@@ -169,6 +173,7 @@ def test_pull_fail(cli, tmpdir, datafiles):
 
             element = project.load_elements([element_name])[0]
             element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -201,6 +206,7 @@ def test_source_pull_partial_fallback_fetch(cli, tmpdir, 
datafiles):
 
             element = project.load_elements([element_name])[0]
             element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index 25a4309..aa703de 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -85,6 +85,7 @@ def test_source_push_split(cli, tmpdir, datafiles):
 
             element = project.load_elements(["push.bst"])[0]
             element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -135,6 +136,7 @@ def test_source_push(cli, tmpdir, datafiles):
 
             element = project.load_elements(["push.bst"])[0]
             element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index 0f2f058..e0e7002 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -65,6 +65,7 @@ def test_source_staged(tmpdir, cli, datafiles):
         # seems to be the only way to get the sources?
         element = project.load_elements(["import-bin.bst"])[0]
         element._initialize_state()
+        element._query_source_cache()
         source = list(element.sources())[0]
         assert element._cached_sources()
         assert sourcecache.contains(source)
@@ -100,6 +101,7 @@ def test_source_fetch(tmpdir, cli, datafiles):
 
         element = project.load_elements(["import-dev.bst"])[0]
         element._initialize_state()
+        element._query_source_cache()
         source = list(element.sources())[0]
         assert element._cached_sources()
 
@@ -136,6 +138,7 @@ def test_staged_source_build(tmpdir, datafiles, cli):
         element._initialize_state()
 
         # check consistency of the source
+        element._query_source_cache()
         assert not element._cached_sources()
 
     res = cli.run(project=project_dir, args=["build", "target.bst"])
diff --git a/tests/sources/git.py b/tests/sources/git.py
index 30657d8..861e70c 100644
--- a/tests/sources/git.py
+++ b/tests/sources/git.py
@@ -401,10 +401,14 @@ def test_submodule_track_ignore_inconsistent(cli, tmpdir, 
datafiles):
     result = cli.run(project=project, args=["source", "fetch", "target.bst"])
     result.assert_success()
 
-    # Track will encounter an inconsistent submodule without any ref
+    # Track to update to the offending commit
     result = cli.run(project=project, args=["source", "track", "target.bst"])
     result.assert_success()
 
+    # Fetch after track will encounter an inconsistent submodule without any 
ref
+    result = cli.run(project=project, args=["source", "fetch", "target.bst"])
+    result.assert_success()
+
     # Assert that we are just fine without it, and emit a warning to the user.
     assert "Ignoring inconsistent submodule" in result.stderr
 

Reply via email to