This is an automated email from the ASF dual-hosted git repository.

pcongiusti pushed a commit to branch release-1.10.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 82e0c81075e213a4bb735a05e7b1be4e4ffd7168
Author: Christoph Deppisch <[email protected]>
AuthorDate: Wed Apr 5 21:11:33 2023 +0200

    fix: Limit parallel builds on operator
    
    - Avoid many parallel integration builds
    - Monitor all builds started by the operator instance and limit max number 
of running builds according to given setting
    - By default use max running builds limit = 3 for build strategy routine
    - By default use max running builds limit = 10 for build strategy pod
    - Add max running builds setting to IntegrationPlatform
    - Add some documentation on build strategy and build queues
---
 config/crd/bases/camel.apache.org_builds.yaml      |   5 +
 .../camel.apache.org_integrationplatforms.yaml     |  10 +
 docs/modules/ROOT/pages/architecture/cr/build.adoc |  31 +++
 docs/modules/ROOT/partials/apis/camel-k-crds.adoc  |  14 ++
 helm/camel-k/crds/crd-build.yaml                   |   5 +
 helm/camel-k/crds/crd-integration-platform.yaml    |  10 +
 pkg/apis/camel/v1/build_types.go                   |   2 +
 pkg/apis/camel/v1/integrationplatform_types.go     |   2 +
 pkg/cmd/install.go                                 |   5 +
 pkg/controller/build/build_controller.go           |   8 +-
 pkg/controller/build/build_monitor.go              | 107 +++++++++
 pkg/controller/build/build_monitor_test.go         | 240 +++++++++++++++++++++
 pkg/controller/build/monitor_pod.go                |  11 +
 pkg/controller/build/monitor_routine.go            |   7 +
 pkg/controller/build/schedule.go                   |  54 ++---
 pkg/controller/integrationkit/build.go             |   7 +-
 pkg/platform/defaults.go                           |   9 +
 pkg/resources/resources.go                         |   8 +-
 18 files changed, 484 insertions(+), 51 deletions(-)

diff --git a/config/crd/bases/camel.apache.org_builds.yaml 
b/config/crd/bases/camel.apache.org_builds.yaml
index f7d585a7e..06f423629 100644
--- a/config/crd/bases/camel.apache.org_builds.yaml
+++ b/config/crd/bases/camel.apache.org_builds.yaml
@@ -79,6 +79,11 @@ spec:
           spec:
             description: BuildSpec defines the Build operation to be executed
             properties:
+              maxRunningBuilds:
+                description: the maximum amount of parallel running builds 
started
+                  by this operator instance
+                format: int32
+                type: integer
               strategy:
                 description: The strategy that should be used to perform the 
Build.
                 enum:
diff --git a/config/crd/bases/camel.apache.org_integrationplatforms.yaml 
b/config/crd/bases/camel.apache.org_integrationplatforms.yaml
index 96f0aec8f..a93b03f1c 100644
--- a/config/crd/bases/camel.apache.org_integrationplatforms.yaml
+++ b/config/crd/bases/camel.apache.org_integrationplatforms.yaml
@@ -271,6 +271,11 @@ spec:
                       Persistent Volume Claim used by Kaniko publish strategy, 
if
                       cache is enabled'
                     type: string
+                  maxRunningBuilds:
+                    description: the maximum amount of parallel running builds 
started
+                      by this operator instance
+                    format: int32
+                    type: integer
                   publishStrategy:
                     description: the strategy to adopt for publishing an 
Integration
                       base image
@@ -1864,6 +1869,11 @@ spec:
                       Persistent Volume Claim used by Kaniko publish strategy, 
if
                       cache is enabled'
                     type: string
+                  maxRunningBuilds:
+                    description: the maximum amount of parallel running builds 
started
+                      by this operator instance
+                    format: int32
+                    type: integer
                   publishStrategy:
                     description: the strategy to adopt for publishing an 
Integration
                       base image
diff --git a/docs/modules/ROOT/pages/architecture/cr/build.adoc 
b/docs/modules/ROOT/pages/architecture/cr/build.adoc
index 60cc9cae2..e7502616a 100644
--- a/docs/modules/ROOT/pages/architecture/cr/build.adoc
+++ b/docs/modules/ROOT/pages/architecture/cr/build.adoc
@@ -3,6 +3,8 @@
 
 A *Build* resource, describes the process of assembling a container image that 
copes with the requirement of an 
xref:architecture/cr/integration.adoc[Integration] or 
xref:architecture/cr/integration-kit.adoc[IntegrationKit].
 
+The result of a build is an 
xref:architecture/cr/integration-kit.adoc[IntegrationKit] that can and should 
be reused for multiple xref:architecture/cr/integration.adoc[Integrations].
+
 [source,go]
 ----
 type Build struct {
@@ -25,3 +27,32 @@ the full go definition can be found 
https://github.com/apache/camel-k/blob/main/
 
 image::architecture/camel-k-state-machine-build.png[life cycle]
 
+[[build-strategy]]
+= Build strategy
+
+You can choose from different build strategies. The build strategy defines how 
a build should be executed.
+At the moment the available strategies are:
+
+- buildStrategy: pod (each build is run in a separate pod, the operator 
monitors the pod state)
+- buildStrategy: routine (each build is run as a go routine inside the 
operator pod)
+
+[[build-queue]]
+= Build queues
+
+IntegrationKits and its base images should be reused for multiple Integrations 
in order to
+accomplish an efficient resource management and to optimize build and startup 
times for Camel K Integrations.
+
+In order to reuse images the operator is going to queue builds in sequential 
order.
+This way the operator is able to use efficient image layering for Integrations.
+
+By default, builds are queued sequentially based on their layout (e.g. native, 
fast-jar) and the build namespace.
+
+To avoid having many builds running in parallel the operator uses a maximum 
number of running builds setting that limits the
+amount of builds running.
+
+You can set this limit in the 
xref:architecture/cr/integration-platform.adoc[IntegrationPlatform] settings.
+
+The default values for this limitation is based on the build strategy.
+
+- buildStrategy: pod (MaxRunningBuilds=10)
+- buildStrategy: routine (MaxRunningBuilds=3)
diff --git a/docs/modules/ROOT/partials/apis/camel-k-crds.adoc 
b/docs/modules/ROOT/partials/apis/camel-k-crds.adoc
index 39a6f08fa..cdaa09dfe 100644
--- a/docs/modules/ROOT/partials/apis/camel-k-crds.adoc
+++ b/docs/modules/ROOT/partials/apis/camel-k-crds.adoc
@@ -439,6 +439,13 @@ The Build deadline is set to the Build start time plus the 
Timeout duration.
 If the Build deadline is exceeded, the Build context is canceled,
 and its phase set to BuildPhaseFailed.
 
+|`maxRunningBuilds` +
+int32
+|
+
+
+the maximum amount of parallel running builds started by this operator instance
+
 
 |===
 
@@ -1721,6 +1728,13 @@ map[string]string
 
 
 
+|`maxRunningBuilds` +
+int32
+|
+
+
+the maximum amount of parallel running builds started by this operator instance
+
 
 |===
 
diff --git a/helm/camel-k/crds/crd-build.yaml b/helm/camel-k/crds/crd-build.yaml
index f7d585a7e..06f423629 100644
--- a/helm/camel-k/crds/crd-build.yaml
+++ b/helm/camel-k/crds/crd-build.yaml
@@ -79,6 +79,11 @@ spec:
           spec:
             description: BuildSpec defines the Build operation to be executed
             properties:
+              maxRunningBuilds:
+                description: the maximum amount of parallel running builds 
started
+                  by this operator instance
+                format: int32
+                type: integer
               strategy:
                 description: The strategy that should be used to perform the 
Build.
                 enum:
diff --git a/helm/camel-k/crds/crd-integration-platform.yaml 
b/helm/camel-k/crds/crd-integration-platform.yaml
index 96f0aec8f..a93b03f1c 100644
--- a/helm/camel-k/crds/crd-integration-platform.yaml
+++ b/helm/camel-k/crds/crd-integration-platform.yaml
@@ -271,6 +271,11 @@ spec:
                       Persistent Volume Claim used by Kaniko publish strategy, 
if
                       cache is enabled'
                     type: string
+                  maxRunningBuilds:
+                    description: the maximum amount of parallel running builds 
started
+                      by this operator instance
+                    format: int32
+                    type: integer
                   publishStrategy:
                     description: the strategy to adopt for publishing an 
Integration
                       base image
@@ -1864,6 +1869,11 @@ spec:
                       Persistent Volume Claim used by Kaniko publish strategy, 
if
                       cache is enabled'
                     type: string
+                  maxRunningBuilds:
+                    description: the maximum amount of parallel running builds 
started
+                      by this operator instance
+                    format: int32
+                    type: integer
                   publishStrategy:
                     description: the strategy to adopt for publishing an 
Integration
                       base image
diff --git a/pkg/apis/camel/v1/build_types.go b/pkg/apis/camel/v1/build_types.go
index 0de3fd47c..9388c977c 100644
--- a/pkg/apis/camel/v1/build_types.go
+++ b/pkg/apis/camel/v1/build_types.go
@@ -37,6 +37,8 @@ type BuildSpec struct {
        // and its phase set to BuildPhaseFailed.
        // +kubebuilder:validation:Format=duration
        Timeout metav1.Duration `json:"timeout,omitempty"`
+       // the maximum amount of parallel running builds started by this 
operator instance
+       MaxRunningBuilds int32 `json:"maxRunningBuilds,omitempty"`
 }
 
 // Task represents the abstract task. Only one of the task should be 
configured to represent the specific task chosen.
diff --git a/pkg/apis/camel/v1/integrationplatform_types.go 
b/pkg/apis/camel/v1/integrationplatform_types.go
index 8f4551841..1049bfec4 100644
--- a/pkg/apis/camel/v1/integrationplatform_types.go
+++ b/pkg/apis/camel/v1/integrationplatform_types.go
@@ -134,6 +134,8 @@ type IntegrationPlatformBuildSpec struct {
        PersistentVolumeClaim string `json:"persistentVolumeClaim,omitempty"`
        //
        PublishStrategyOptions map[string]string 
`json:"PublishStrategyOptions,omitempty"`
+       // the maximum amount of parallel running builds started by this 
operator instance
+       MaxRunningBuilds int32 `json:"maxRunningBuilds,omitempty"`
 }
 
 // IntegrationPlatformKameletSpec define the behavior for all the Kamelets 
controller by the IntegrationPlatform
diff --git a/pkg/cmd/install.go b/pkg/cmd/install.go
index e953ea99c..ed9f57efa 100644
--- a/pkg/cmd/install.go
+++ b/pkg/cmd/install.go
@@ -196,6 +196,7 @@ type installCmdOptions struct {
        HealthPort               int32    `mapstructure:"health-port"`
        Monitoring               bool     `mapstructure:"monitoring"`
        MonitoringPort           int32    `mapstructure:"monitoring-port"`
+       MaxRunningBuilds         int32    `mapstructure:"max-running-builds"`
        TraitProfile             string   `mapstructure:"trait-profile"`
        Tolerations              []string `mapstructure:"tolerations"`
        NodeSelectors            []string `mapstructure:"node-selectors"`
@@ -435,6 +436,10 @@ func (o *installCmdOptions) install(cobraCmd 
*cobra.Command, _ []string) error {
                                Duration: d,
                        }
                }
+               if o.MaxRunningBuilds > 0 {
+                       platform.Spec.Build.MaxRunningBuilds = 
o.MaxRunningBuilds
+               }
+
                if o.TraitProfile != "" {
                        platform.Spec.Profile = 
v1.TraitProfileByName(o.TraitProfile)
                }
diff --git a/pkg/controller/build/build_controller.go 
b/pkg/controller/build/build_controller.go
index 117ccaa1e..52fac6484 100644
--- a/pkg/controller/build/build_controller.go
+++ b/pkg/controller/build/build_controller.go
@@ -146,11 +146,15 @@ func (r *reconcileBuild) Reconcile(ctx context.Context, 
request reconcile.Reques
 
        var actions []Action
 
+       buildMonitor := Monitor{
+               maxRunningBuilds: instance.Spec.MaxRunningBuilds,
+       }
+
        switch instance.Spec.Strategy {
        case v1.BuildStrategyPod:
                actions = []Action{
                        newInitializePodAction(r.reader),
-                       newScheduleAction(r.reader),
+                       newScheduleAction(r.reader, buildMonitor),
                        newMonitorPodAction(r.reader),
                        newErrorRecoveryAction(),
                        newErrorAction(),
@@ -158,7 +162,7 @@ func (r *reconcileBuild) Reconcile(ctx context.Context, 
request reconcile.Reques
        case v1.BuildStrategyRoutine:
                actions = []Action{
                        newInitializeRoutineAction(),
-                       newScheduleAction(r.reader),
+                       newScheduleAction(r.reader, buildMonitor),
                        newMonitorRoutineAction(),
                        newErrorRecoveryAction(),
                        newErrorAction(),
diff --git a/pkg/controller/build/build_monitor.go 
b/pkg/controller/build/build_monitor.go
new file mode 100644
index 000000000..c7d8443d5
--- /dev/null
+++ b/pkg/controller/build/build_monitor.go
@@ -0,0 +1,107 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package build
+
+import (
+       "context"
+       "sync"
+
+       "k8s.io/apimachinery/pkg/labels"
+       "k8s.io/apimachinery/pkg/selection"
+       "k8s.io/apimachinery/pkg/types"
+       ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+
+       v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/pkg/util/kubernetes"
+)
+
+var runningBuilds sync.Map
+
+type Monitor struct {
+       maxRunningBuilds int32
+}
+
+func (bm *Monitor) canSchedule(ctx context.Context, c ctrl.Reader, build 
*v1.Build) (bool, error) {
+       var runningBuildsTotal int32
+       runningBuilds.Range(func(_, v interface{}) bool {
+               runningBuildsTotal++
+               return true
+       })
+
+       if runningBuildsTotal >= bm.maxRunningBuilds {
+               requestName := build.Name
+               requestNamespace := build.Namespace
+               buildCreator := kubernetes.GetCamelCreator(build)
+               if buildCreator != nil {
+                       requestName = buildCreator.Name
+                       requestNamespace = buildCreator.Namespace
+               }
+
+               Log.WithValues("request-namespace", requestNamespace, 
"request-name", requestName, "max-running-builds-limit", runningBuildsTotal).
+                       ForBuild(build).Infof("Maximum number of running builds 
(%d) exceeded - the build gets enqueued", runningBuildsTotal)
+
+               // max number of running builds limit exceeded
+               return false, nil
+       }
+
+       layout := build.Labels[v1.IntegrationKitLayoutLabel]
+
+       // Native builds can be run in parallel, as incremental images is not 
applicable.
+       if layout == v1.IntegrationKitLayoutNative {
+               return true, nil
+       }
+
+       // We assume incremental images is only applicable across images whose 
layout is identical
+       withCompatibleLayout, err := 
labels.NewRequirement(v1.IntegrationKitLayoutLabel, selection.Equals, 
[]string{layout})
+       if err != nil {
+               return false, err
+       }
+
+       builds := &v1.BuildList{}
+       // We use the non-caching client as informers cache is not invalidated 
nor updated
+       // atomically by write operations
+       err = c.List(ctx, builds,
+               ctrl.InNamespace(build.Namespace),
+               ctrl.MatchingLabelsSelector{
+                       Selector: 
labels.NewSelector().Add(*withCompatibleLayout),
+               })
+       if err != nil {
+               return false, err
+       }
+
+       // Emulate a serialized working queue to only allow one build to run at 
a given time.
+       // This is currently necessary for the incremental build to work as 
expected.
+       // We may want to explicitly manage build priority as opposed to 
relying on
+       // the reconciliation loop to handle the queuing.
+       for _, b := range builds.Items {
+               if b.Status.Phase == v1.BuildPhasePending || b.Status.Phase == 
v1.BuildPhaseRunning {
+                       // Let's requeue the build in case one is already 
running
+                       return false, nil
+               }
+       }
+
+       return true, nil
+}
+
+func monitorRunningBuild(build *v1.Build) {
+       runningBuilds.Store(types.NamespacedName{Namespace: build.Namespace, 
Name: build.Name}.String(), true)
+}
+
+func monitorFinishedBuild(build *v1.Build) {
+       runningBuilds.Delete(types.NamespacedName{Namespace: build.Namespace, 
Name: build.Name}.String())
+}
diff --git a/pkg/controller/build/build_monitor_test.go 
b/pkg/controller/build/build_monitor_test.go
new file mode 100644
index 000000000..cadb37330
--- /dev/null
+++ b/pkg/controller/build/build_monitor_test.go
@@ -0,0 +1,240 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package build
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/pkg/util/test"
+
+       "github.com/stretchr/testify/assert"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/runtime"
+)
+
+func TestMonitorBuilds(t *testing.T) {
+       testcases := []struct {
+               name     string
+               running  []*v1.Build
+               finished []*v1.Build
+               build    *v1.Build
+               allowed  bool
+       }{
+               {
+                       name:     "allowNewBuild",
+                       running:  []*v1.Build{},
+                       finished: []*v1.Build{},
+                       build:    newBuild("ns", "my-build"),
+                       allowed:  true,
+               },
+               {
+                       name:     "allowNewNativeBuild",
+                       running:  []*v1.Build{},
+                       finished: []*v1.Build{},
+                       build:    newNativeBuild("ns", "my-build"),
+                       allowed:  true,
+               },
+               {
+                       name:    "allowNewBuildWhenOthersFinished",
+                       running: []*v1.Build{},
+                       finished: []*v1.Build{
+                               newBuildInPhase("ns", "my-build-x", 
v1.BuildPhaseSucceeded),
+                               newBuildInPhase("ns", "my-build-failed", 
v1.BuildPhaseFailed),
+                       },
+                       build:   newBuild("ns", "my-build"),
+                       allowed: true,
+               },
+               {
+                       name:    "allowNewNativeBuildWhenOthersFinished",
+                       running: []*v1.Build{},
+                       finished: []*v1.Build{
+                               newNativeBuildInPhase("ns", "my-build-x", 
v1.BuildPhaseSucceeded),
+                               newNativeBuildInPhase("ns", "my-build-failed", 
v1.BuildPhaseFailed),
+                       },
+                       build:   newNativeBuild("ns", "my-build"),
+                       allowed: true,
+               },
+               {
+                       name: "limitMaxRunningBuilds",
+                       running: []*v1.Build{
+                               newBuild("some-ns", "my-build-1"),
+                               newBuild("other-ns", "my-build-2"),
+                               newBuild("another-ns", "my-build-3"),
+                       },
+                       finished: []*v1.Build{
+                               newBuildInPhase("ns", "my-build-x", 
v1.BuildPhaseSucceeded),
+                       },
+                       build:   newBuild("ns", "my-build"),
+                       allowed: false,
+               },
+               {
+                       name: "limitMaxRunningNativeBuilds",
+                       running: []*v1.Build{
+                               newBuildInPhase("some-ns", "my-build-1", 
v1.BuildPhaseRunning),
+                               newNativeBuildInPhase("other-ns", "my-build-2", 
v1.BuildPhaseRunning),
+                               newNativeBuildInPhase("another-ns", 
"my-build-3", v1.BuildPhaseRunning),
+                       },
+                       finished: []*v1.Build{
+                               newNativeBuildInPhase("ns", "my-build-x", 
v1.BuildPhaseSucceeded),
+                       },
+                       build:   newNativeBuildInPhase("ns", "my-build", 
v1.BuildPhaseInitialization),
+                       allowed: false,
+               },
+               {
+                       name: "allowParallelBuildsWithDifferentLayout",
+                       running: []*v1.Build{
+                               newNativeBuildInPhase("ns", "my-build-1", 
v1.BuildPhaseRunning),
+                       },
+                       build:   newBuild("ns", "my-build"),
+                       allowed: true,
+               },
+               {
+                       name: "queueBuildsInSameNamespaceWithSameLayout",
+                       running: []*v1.Build{
+                               newBuild("ns", "my-build-1"),
+                               newBuild("other-ns", "my-build-2"),
+                       },
+                       finished: []*v1.Build{
+                               newBuildInPhase("ns", "my-build-x", 
v1.BuildPhaseSucceeded),
+                       },
+                       build:   newBuild("ns", "my-build"),
+                       allowed: false,
+               },
+               {
+                       name: "allowBuildsInNewNamespace",
+                       running: []*v1.Build{
+                               newBuild("some-ns", "my-build-1"),
+                               newBuild("other-ns", "my-build-2"),
+                       },
+                       finished: []*v1.Build{
+                               newBuildInPhase("ns", "my-build-x", 
v1.BuildPhaseSucceeded),
+                       },
+                       build:   newBuild("ns", "my-build"),
+                       allowed: true,
+               },
+       }
+
+       for _, tc := range testcases {
+               t.Run(tc.name, func(t *testing.T) {
+                       var initObjs []runtime.Object
+                       for _, build := range append(tc.running, 
tc.finished...) {
+                               initObjs = append(initObjs, build)
+                       }
+
+                       c, err := test.NewFakeClient(initObjs...)
+
+                       assert.Nil(t, err)
+
+                       bm := Monitor{
+                               maxRunningBuilds: 3,
+                       }
+
+                       // reset running builds in memory cache
+                       cleanRunningBuildsMonitor()
+                       for _, build := range tc.running {
+                               monitorRunningBuild(build)
+                       }
+
+                       allowed, err := bm.canSchedule(context.TODO(), c, 
tc.build)
+
+                       assert.Nil(t, err)
+                       assert.Equal(t, tc.allowed, allowed)
+               })
+       }
+}
+
+func TestAllowBuildRequeue(t *testing.T) {
+       c, err := test.NewFakeClient()
+
+       assert.Nil(t, err)
+
+       bm := Monitor{
+               maxRunningBuilds: 3,
+       }
+
+       runningBuild := newBuild("some-ns", "my-build-1")
+       // reset running builds in memory cache
+       cleanRunningBuildsMonitor()
+       monitorRunningBuild(runningBuild)
+       monitorRunningBuild(newBuild("other-ns", "my-build-2"))
+       monitorRunningBuild(newBuild("another-ns", "my-build-3"))
+
+       build := newBuild("ns", "my-build")
+       allowed, err := bm.canSchedule(context.TODO(), c, build)
+
+       assert.Nil(t, err)
+       assert.False(t, allowed)
+
+       monitorFinishedBuild(runningBuild)
+
+       allowed, err = bm.canSchedule(context.TODO(), c, build)
+
+       assert.Nil(t, err)
+       assert.True(t, allowed)
+}
+
+func cleanRunningBuildsMonitor() {
+       runningBuilds.Range(func(key interface{}, v interface{}) bool {
+               runningBuilds.Delete(key)
+               return true
+       })
+}
+
+func newBuild(namespace string, name string) *v1.Build {
+       return newBuildWithLayoutInPhase(namespace, name, 
v1.IntegrationKitLayoutFastJar, v1.BuildPhasePending)
+}
+
+func newNativeBuild(namespace string, name string) *v1.Build {
+       return newBuildWithLayoutInPhase(namespace, name, 
v1.IntegrationKitLayoutNative, v1.BuildPhasePending)
+}
+
+func newBuildInPhase(namespace string, name string, phase v1.BuildPhase) 
*v1.Build {
+       return newBuildWithLayoutInPhase(namespace, name, 
v1.IntegrationKitLayoutFastJar, phase)
+}
+
+func newNativeBuildInPhase(namespace string, name string, phase v1.BuildPhase) 
*v1.Build {
+       return newBuildWithLayoutInPhase(namespace, name, 
v1.IntegrationKitLayoutNative, phase)
+}
+
+func newBuildWithLayoutInPhase(namespace string, name string, layout string, 
phase v1.BuildPhase) *v1.Build {
+       return &v1.Build{
+               TypeMeta: metav1.TypeMeta{
+                       APIVersion: v1.SchemeGroupVersion.String(),
+                       Kind:       v1.BuildKind,
+               },
+               ObjectMeta: metav1.ObjectMeta{
+                       Namespace: namespace,
+                       Name:      name,
+                       Labels: map[string]string{
+                               v1.IntegrationKitLayoutLabel: layout,
+                       },
+               },
+               Spec: v1.BuildSpec{
+                       Strategy:         v1.BuildStrategyRoutine,
+                       Tasks:            []v1.Task{},
+                       Timeout:          metav1.Duration{Duration: 5 * 
time.Minute},
+                       MaxRunningBuilds: 3,
+               },
+               Status: v1.BuildStatus{
+                       Phase: phase,
+               },
+       }
+}
diff --git a/pkg/controller/build/monitor_pod.go 
b/pkg/controller/build/monitor_pod.go
index baaa5800c..0cd4bb859 100644
--- a/pkg/controller/build/monitor_pod.go
+++ b/pkg/controller/build/monitor_pod.go
@@ -87,6 +87,7 @@ func (action *monitorPodAction) Handle(ctx context.Context, 
build *v1.Build) (*v
                        // Emulate context cancellation
                        build.Status.Phase = v1.BuildPhaseInterrupted
                        build.Status.Error = "Pod deleted"
+                       monitorFinishedBuild(build)
                        return build, nil
                }
        }
@@ -109,6 +110,12 @@ func (action *monitorPodAction) Handle(ctx 
context.Context, build *v1.Build) (*v
                                // Requeue
                                return nil, err
                        }
+
+                       monitorFinishedBuild(build)
+               } else {
+                       // Monitor running state of the build - this may have 
been done already by the schedule action but the build monitor is idempotent
+                       // We do this here to potentially restore the running 
build state in the monitor in case of an operator restart
+                       monitorRunningBuild(build)
                }
 
        case corev1.PodSucceeded:
@@ -122,6 +129,8 @@ func (action *monitorPodAction) Handle(ctx context.Context, 
build *v1.Build) (*v
                duration := finishedAt.Sub(build.Status.StartedAt.Time)
                build.Status.Duration = duration.String()
 
+               monitorFinishedBuild(build)
+
                buildCreator := kubernetes.GetCamelCreator(build)
                // Account for the Build metrics
                observeBuildResult(build, build.Status.Phase, buildCreator, 
duration)
@@ -168,6 +177,8 @@ func (action *monitorPodAction) Handle(ctx context.Context, 
build *v1.Build) (*v
                duration := finishedAt.Sub(build.Status.StartedAt.Time)
                build.Status.Duration = duration.String()
 
+               monitorFinishedBuild(build)
+
                buildCreator := kubernetes.GetCamelCreator(build)
                // Account for the Build metrics
                observeBuildResult(build, build.Status.Phase, buildCreator, 
duration)
diff --git a/pkg/controller/build/monitor_routine.go 
b/pkg/controller/build/monitor_routine.go
index 055d960bd..b4d0b8a5e 100644
--- a/pkg/controller/build/monitor_routine.go
+++ b/pkg/controller/build/monitor_routine.go
@@ -68,6 +68,7 @@ func (action *monitorRoutineAction) Handle(ctx 
context.Context, build *v1.Build)
                        routines.Delete(build.Name)
                        build.Status.Phase = v1.BuildPhaseFailed
                        build.Status.Error = "Build routine exists"
+                       monitorFinishedBuild(build)
                        return build, nil
                }
                status := v1.BuildStatus{Phase: v1.BuildPhaseRunning}
@@ -86,10 +87,14 @@ func (action *monitorRoutineAction) Handle(ctx 
context.Context, build *v1.Build)
                        // stops abruptly and restarts or the build status 
update fails.
                        build.Status.Phase = v1.BuildPhaseFailed
                        build.Status.Error = "Build routine not running"
+                       monitorFinishedBuild(build)
                        return build, nil
                }
        }
 
+       // Monitor running state of the build - this may have been done already 
by the schedule action but the monitor action is idempotent
+       // We do this here to recover the running build state in the monitor in 
case of an operator restart
+       monitorRunningBuild(build)
        return nil, nil
 }
 
@@ -176,6 +181,8 @@ tasks:
        duration := metav1.Now().Sub(build.Status.StartedAt.Time)
        status.Duration = duration.String()
 
+       monitorFinishedBuild(build)
+
        buildCreator := kubernetes.GetCamelCreator(build)
        // Account for the Build metrics
        observeBuildResult(build, status.Phase, buildCreator, duration)
diff --git a/pkg/controller/build/schedule.go b/pkg/controller/build/schedule.go
index 12e879d61..0bedfb0ed 100644
--- a/pkg/controller/build/schedule.go
+++ b/pkg/controller/build/schedule.go
@@ -22,9 +22,6 @@ import (
        "sync"
 
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-       "k8s.io/apimachinery/pkg/labels"
-       "k8s.io/apimachinery/pkg/selection"
-
        ctrl "sigs.k8s.io/controller-runtime/pkg/client"
 
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
@@ -32,16 +29,18 @@ import (
        "github.com/apache/camel-k/pkg/util/kubernetes"
 )
 
-func newScheduleAction(reader ctrl.Reader) Action {
+func newScheduleAction(reader ctrl.Reader, buildMonitor Monitor) Action {
        return &scheduleAction{
-               reader: reader,
+               reader:       reader,
+               buildMonitor: buildMonitor,
        }
 }
 
 type scheduleAction struct {
        baseAction
-       lock   sync.Mutex
-       reader ctrl.Reader
+       lock         sync.Mutex
+       reader       ctrl.Reader
+       buildMonitor Monitor
 }
 
 // Name returns a common name of the action.
@@ -60,42 +59,11 @@ func (action *scheduleAction) Handle(ctx context.Context, 
build *v1.Build) (*v1.
        action.lock.Lock()
        defer action.lock.Unlock()
 
-       layout := build.Labels[v1.IntegrationKitLayoutLabel]
-
-       // Native builds can be run in parallel, as incremental images is not 
applicable.
-       if layout == v1.IntegrationKitLayoutNative {
-               // Reset the Build status, and transition it to pending phase.
-               // This must be done in the critical section, rather than 
delegated to the controller.
-               return nil, action.toPendingPhase(ctx, build)
-       }
-
-       // We assume incremental images is only applicable across images whose 
layout is identical
-       withCompatibleLayout, err := 
labels.NewRequirement(v1.IntegrationKitLayoutLabel, selection.Equals, 
[]string{layout})
-       if err != nil {
+       if allowed, err := action.buildMonitor.canSchedule(ctx, action.reader, 
build); err != nil {
                return nil, err
-       }
-
-       builds := &v1.BuildList{}
-       // We use the non-caching client as informers cache is not invalidated 
nor updated
-       // atomically by write operations
-       err = action.reader.List(ctx, builds,
-               ctrl.InNamespace(build.Namespace),
-               ctrl.MatchingLabelsSelector{
-                       Selector: 
labels.NewSelector().Add(*withCompatibleLayout),
-               })
-       if err != nil {
-               return nil, err
-       }
-
-       // Emulate a serialized working queue to only allow one build to run at 
a given time.
-       // This is currently necessary for the incremental build to work as 
expected.
-       // We may want to explicitly manage build priority as opposed to 
relying on
-       // the reconciliation loop to handle the queuing.
-       for _, b := range builds.Items {
-               if b.Status.Phase == v1.BuildPhasePending || b.Status.Phase == 
v1.BuildPhaseRunning {
-                       // Let's requeue the build in case one is already 
running
-                       return nil, nil
-               }
+       } else if !allowed {
+               // Build not allowed at this state (probably max running builds 
limit exceeded) - let's requeue the build
+               return nil, nil
        }
 
        // Reset the Build status, and transition it to pending phase.
@@ -117,6 +85,8 @@ func (action *scheduleAction) toPendingPhase(ctx 
context.Context, build *v1.Buil
                return err
        }
 
+       monitorRunningBuild(build)
+
        buildCreator := kubernetes.GetCamelCreator(build)
        // Report the duration the Build has been waiting in the build queue
        observeBuildQueueDuration(build, buildCreator)
diff --git a/pkg/controller/integrationkit/build.go 
b/pkg/controller/integrationkit/build.go
index 9ea226fac..8f5524779 100644
--- a/pkg/controller/integrationkit/build.go
+++ b/pkg/controller/integrationkit/build.go
@@ -115,9 +115,10 @@ func (action *buildAction) handleBuildSubmitted(ctx 
context.Context, kit *v1.Int
                                Annotations: annotations,
                        },
                        Spec: v1.BuildSpec{
-                               Strategy: 
env.Platform.Status.Build.BuildStrategy,
-                               Tasks:    env.BuildTasks,
-                               Timeout:  timeout,
+                               Strategy:         
env.Platform.Status.Build.BuildStrategy,
+                               Tasks:            env.BuildTasks,
+                               Timeout:          timeout,
+                               MaxRunningBuilds: 
env.Platform.Status.Build.MaxRunningBuilds,
                        },
                }
 
diff --git a/pkg/platform/defaults.go b/pkg/platform/defaults.go
index bbe974659..b38541210 100644
--- a/pkg/platform/defaults.go
+++ b/pkg/platform/defaults.go
@@ -217,6 +217,15 @@ func setPlatformDefaults(p *v1.IntegrationPlatform, 
verbose bool) error {
                        Duration: 5 * time.Minute,
                }
        }
+
+       if p.Status.Build.MaxRunningBuilds <= 0 {
+               if p.Status.Build.BuildStrategy == v1.BuildStrategyRoutine {
+                       p.Status.Build.MaxRunningBuilds = 3
+               } else if p.Status.Build.BuildStrategy == v1.BuildStrategyPod {
+                       p.Status.Build.MaxRunningBuilds = 10
+               }
+       }
+
        _, cacheEnabled := 
p.Status.Build.PublishStrategyOptions["KanikoBuildCache"]
        if p.Status.Build.PublishStrategy == 
v1.IntegrationPlatformBuildPublishStrategyKaniko && !cacheEnabled {
                // Default to disabling Kaniko cache warmer
diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go
index 7f2ebcfcf..9f5efc229 100644
--- a/pkg/resources/resources.go
+++ b/pkg/resources/resources.go
@@ -117,9 +117,9 @@ var assets = func() http.FileSystem {
                "/crd/bases/camel.apache.org_builds.yaml": 
&vfsgen۰CompressedFileInfo{
                        name:             "camel.apache.org_builds.yaml",
                        modTime:          time.Time{},
-                       uncompressedSize: 43400,
+                       uncompressedSize: 43619,
 
-                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\x3d\x5d\x73\x23\x39\x6e\xef\xfe\x15\xa8\xf5\xc3\x78\xaa\x2c\x69\x77\xef\x23\x1b\xa7\x52\x29\x9f\x67\x77\xcf\x99\x0f\x3b\x23\xef\xdc\xdd\x9b\xa9\x6e\x48\xe2\xa9\x9b\xec\x90\x6c\x6b\x74\xa9\xfc\xf7\x14\x41\xb2\xd5\x92\xfa\x83\xed\x8f\x99\xcd\xad\xf8\x32\xe3\x16\x09\x02\x20\x09\x80\x20\x08\x9e\xc2\xe8\xf9\xca\xc9\x29\xbc\xe3\x09\x0a\x8d\x29\x18\x09\x66\x89\x70\x59\xb0\x64\x89\x30\x95\x73\xb3\x66\x0a\xe1\x27\x59\x8a\x
 [...]
+                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\x3d\x5d\x73\xe3\x38\x8e\xef\xf9\x15\xa8\xc9\x43\xa7\xab\x62\x7b\x3e\x76\xf7\xe6\x72\x75\x75\x95\x4d\xcf\xcc\xe6\xfa\x23\x7d\xed\x4c\xef\xee\x5b\x68\x09\xb6\xb9\x96\x48\x1d\x49\xc5\xf1\x5e\xdd\x7f\xbf\x22\x48\xca\xb2\xad\x0f\x2a\x1f\xdd\x7b\x3b\xe6\x4b\x77\x2c\x09\x04\x40\x10\x00\x41\x10\x3c\x85\xd1\xf3\xb5\x93\x53\x78\xc7\x13\x14\x1a\x53\x30\x12\xcc\x12\xe1\xb2\x60\xc9\x12\x61\x2a\xe7\x66\xcd\x14\xc2\xcf\xb2\x14\x
 [...]
                },
                "/crd/bases/camel.apache.org_camelcatalogs.yaml": 
&vfsgen۰CompressedFileInfo{
                        name:             "camel.apache.org_camelcatalogs.yaml",
@@ -138,9 +138,9 @@ var assets = func() http.FileSystem {
                "/crd/bases/camel.apache.org_integrationplatforms.yaml": 
&vfsgen۰CompressedFileInfo{
                        name:             
"camel.apache.org_integrationplatforms.yaml",
                        modTime:          time.Time{},
-                       uncompressedSize: 178941,
+                       uncompressedSize: 179419,
 
-                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xfd\xfd\x73\xdb\x36\xb6\x30\x8e\xff\x9e\xbf\x02\xe3\xce\x9d\x38\x19\x49\x4e\xba\xb7\xbb\xbd\x7e\xa6\xf3\x3c\xae\x93\xb6\x6e\xe2\xd8\xd7\x76\x72\xef\x4e\xdb\xa9\x20\xf2\x48\x42\x4c\x02\x5c\x00\x94\xad\x7e\xf7\xfb\xbf\x7f\x06\x07\x00\x49\x49\x24\x48\x49\x7e\x6b\x23\x76\x66\x37\xb6\x09\xf0\xe0\xe0\xe0\xbc\xe1\xbc\x7c\x45\xfa\x77\xf7\x3c\xfb\x8a\xbc\x67\x11\x70\x05\x31\xd1\x82\xe8\x29\x90\xa3\x8c\x46\x53\x20\x97\x62\x
 [...]
+                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xfd\xfd\x73\xdb\x36\xb6\x30\x8e\xff\x9e\xbf\x02\xe3\xce\x9d\x38\x19\x49\x4e\xba\xb7\xbb\xbd\x7e\xa6\xf3\x3c\xae\x93\xb6\x6e\xe2\xd8\xd7\x76\x72\xef\x4e\xdb\xa9\x20\xf2\x48\x42\x4c\x02\x5c\x00\x94\xad\x7e\xf7\xfb\xbf\x7f\x06\x07\x00\x49\x49\x24\x48\x49\x7e\x6b\x23\x76\x66\x37\xb6\x89\x43\xe0\xe0\xe0\xbc\xe1\xbc\x7c\x45\xfa\x77\xf7\x3c\xfb\x8a\xbc\x67\x11\x70\x05\x31\xd1\x82\xe8\x29\x90\xa3\x8c\x46\x53\x20\x97\x62\x
 [...]
                },
                "/crd/bases/camel.apache.org_integrations.yaml": 
&vfsgen۰CompressedFileInfo{
                        name:             "camel.apache.org_integrations.yaml",


Reply via email to