This is an automated email from the ASF dual-hosted git repository.
ricardozanini pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-serverless-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 4f2ba090 [issue-464] Create a Prometheus ServiceMonitor object that
can capture/collect metrics from deployed SonataFlow instances (#540)
4f2ba090 is described below
commit 4f2ba0905e801ec3ee5c647b70eb2d9a41fbeb01
Author: Jianrong Zhang <[email protected]>
AuthorDate: Wed Oct 23 10:49:23 2024 -0400
[issue-464] Create a Prometheus ServiceMonitor object that can
capture/collect metrics from deployed SonataFlow instances (#540)
---
.github/workflows/e2e.yml | 7 ++
Makefile | 26 ++++-
api/v1alpha08/sonataflowplatform_types.go | 12 +++
api/v1alpha08/zz_generated.deepcopy.go | 20 ++++
.../sonataflow.org_sonataflowplatforms.yaml | 8 ++
cmd/main.go | 5 +-
.../bases/sonataflow.org_sonataflowplatforms.yaml | 8 ++
config/rbac/role.yaml | 11 +++
go.work.sum | 2 +
internal/controller/knative/knative.go | 19 +---
.../controller/monitoring/monitoring.go | 40 +++++---
.../controller/profiles/common/object_creators.go | 33 ++++++-
.../profiles/common/object_creators_test.go | 28 ++++++
internal/controller/profiles/dev/profile_dev.go | 3 +
.../controller/profiles/dev/profile_dev_test.go | 19 ++--
internal/controller/profiles/dev/states_dev.go | 17 ++++
.../profiles/gitops/profile_gitops_test.go | 4 +-
.../controller/profiles/monitoring/monitoring.go | 66 +++++++++++++
.../profiles/preview/deployment_handler.go | 22 ++++-
.../profiles/preview/deployment_handler_test.go | 10 +-
.../controller/profiles/preview/profile_preview.go | 14 ++-
.../profiles/preview/profile_preview_test.go | 29 ++++--
internal/controller/sonataflow_controller.go | 11 +++
.../controller/sonataflowplatform_controller.go | 11 +++
.../sonataflowplatform_controller_test.go | 5 +-
operator.yaml | 19 ++++
test/e2e/e2e_suite_test.go | 4 +
test/e2e/helpers.go | 107 ++++++++++++++++++++-
.../k8s_deployment/01-sonataflow_platform.yaml | 47 ++-------
.../k8s_deployment/02-sonataflow_greetings.yaml | 63 +++++-------
.../prometheus/k8s_deployment/kustomization.yaml | 48 ++-------
test/e2e/workflow_test.go | 94 +++++++++++++++++-
test/kubernetes_cli.go | 2 +
.../rbac/role.yaml => test/testdata/grafana.yaml | 50 +++-------
.../role.yaml => test/testdata/prometheus.yaml | 79 +++++++++------
test/yaml.go | 3 +-
utils/client.go | 22 ++++-
utils/kubernetes/security.go | 3 +-
38 files changed, 699 insertions(+), 272 deletions(-)
diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index ec598cc8..be56273f 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -17,6 +17,7 @@ env:
PYTHON_VERSION: "3.10"
KIND_VERSION: v0.20.0
KNATIVE_VERSION: v1.12.5
+ PROMETHEUS_VERSION: v0.70.0
OPERATOR_IMAGE_NAME: "127.0.0.1:5001/kogito-serverless-operator:0.0.1"
jobs:
@@ -68,6 +69,9 @@ jobs:
- name: Deploy Knative Eventing and Serving
run: make KNATIVE_VERSION=${{ env.KNATIVE_VERSION }} deploy-knative
+ - name: Deploy Prometheus
+ run: make PROMETHEUS_VERSION=${{ env.PROMETHEUS_VERSION }}
deploy-prometheus
+
- name: Set OPERATOR_IMAGE_NAME to Point to Kind's Local Registry
run: echo "OPERATOR_IMAGE_NAME=${{ env.OPERATOR_IMAGE_NAME }}" >>
$GITHUB_ENV
@@ -92,6 +96,9 @@ jobs:
- name: Run E2E Tests for Persistent Flows
run: make test-e2e label=flows-persistence
+ - name: Run E2E Tests for Workflow Monitoring
+ run: make test-e2e label=flows-monitoring
+
- name: Run E2E Tests for Platform
run: make test-e2e label=platform
diff --git a/Makefile b/Makefile
index 81ec5a13..9b0dc792 100644
--- a/Makefile
+++ b/Makefile
@@ -123,8 +123,7 @@ test: manifests generate envtest test-api ## Run tests.
@$(MAKE) vet
@$(MAKE) fmt
@echo "🔍 Running controller tests..."
- @KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION)
--bin-dir $(LOCALBIN) -p path)" \
- go test $(shell go list ./... | grep -v /test/) -coverprofile cover.out
> /dev/null 2>&1
+ KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION)
--bin-dir $(LOCALBIN) -p path)" go test $(shell go list ./... | grep -v /test/)
-coverprofile cover.out
@echo "✅ Tests completed successfully. Coverage report generated:
cover.out."
.PHONY: test-api
@@ -264,6 +263,8 @@ GOLANGCI_LINT_VERSION ?= v1.57.2
KIND_VERSION ?= v0.20.0
KNATIVE_VERSION ?= v1.13.2
TIMEOUT_SECS ?= 180s
+PROMETHEUS_VERSION ?= v0.70.0
+GRAFANA_VERSION ?= v5.13.0
KNATIVE_SERVING_PREFIX ?=
"https://github.com/knative/serving/releases/download/knative-$(KNATIVE_VERSION)"
KNATIVE_EVENTING_PREFIX ?=
"https://github.com/knative/eventing/releases/download/knative-$(KNATIVE_VERSION)"
@@ -402,7 +403,7 @@ generate-all: generate generate-deploy bundle
@$(MAKE) fmt
.PHONY: test-e2e # You will need to have a Minikube/Kind cluster up and
running to run this target, and run container-builder before the test
-label = "flows-ephemeral" # possible values are flows-ephemeral,
flows-persistence, platform, cluster
+label = "flows-ephemeral" # possible values are flows-ephemeral,
flows-persistence, flows-monitoring, platform, cluster
test-e2e:
ifeq ($(label), cluster)
@echo "🌐 Running e2e tests for cluster..."
@@ -424,8 +425,13 @@ else ifeq ($(label), flows-persistence)
go test ./test/e2e/e2e_suite_test.go ./test/e2e/helpers.go
./test/e2e/workflow_test.go \
-v -ginkgo.v -ginkgo.no-color -ginkgo.github-output
-ginkgo.label-filter=$(label) \
-ginkgo.junit-report=./e2e-test-report-workflow_test.xml -timeout 60m
KUSTOMIZE=$(KUSTOMIZE);
+else ifeq ($(label), flows-monitoring)
+ @echo "🔁 Running e2e tests for flows-monitoring..."
+ go test ./test/e2e/e2e_suite_test.go ./test/e2e/helpers.go
./test/e2e/workflow_test.go \
+ -v -ginkgo.v -ginkgo.no-color -ginkgo.github-output
-ginkgo.label-filter=$(label) \
+ -ginkgo.junit-report=./e2e-test-report-workflow_test.xml -timeout 60m
KUSTOMIZE=$(KUSTOMIZE);
else
- @echo "❌ Invalid label. Please use one of: cluster, platform,
flows-ephemeral, flows-persistence"
+ @echo "❌ Invalid label. Please use one of: cluster, platform,
flows-ephemeral, flows-persistence, flows-monitoring"
endif
@@ -450,6 +456,18 @@ deploy-knative:
kubectl wait --for=condition=Ready=True KnativeServing/knative-serving
-n knative-serving --timeout=$(TIMEOUT_SECS)
kubectl wait --for=condition=Ready=True
KnativeEventing/knative-eventing -n knative-eventing --timeout=$(TIMEOUT_SECS)
+.PHONY: deploy-prometheus
+deploy-prometheus: create-cluster
+ kubectl create -f
https://github.com/prometheus-operator/prometheus-operator/releases/download/$(PROMETHEUS_VERSION)/bundle.yaml
+ kubectl wait --for=condition=Available=True deploy/prometheus-operator
-n default --timeout=$(TIMEOUT_SECS)
+ kubectl apply -f ./test/testdata/prometheus.yaml -n default
+ kubectl wait --for=condition=Available=True prometheus/prometheus -n
default --timeout=$(TIMEOUT_SECS)
+
+.PHONY: deploy-grafana
+deploy-grafana: create-cluster
+ kubectl create -f
https://github.com/grafana/grafana-operator/releases/download/$(GRAFANA_VERSION)/kustomize-cluster_scoped.yaml
+ kubectl wait --for=condition=Available=True
deploy/grafana-operator-controller-manager -n grafana --timeout=$(TIMEOUT_SECS)
+
.PHONY: delete-cluster
delete-cluster: install-kind
kind delete cluster && $(BUILDER) rm -f kind-registry
diff --git a/api/v1alpha08/sonataflowplatform_types.go
b/api/v1alpha08/sonataflowplatform_types.go
index e1a009e7..55823fed 100644
--- a/api/v1alpha08/sonataflowplatform_types.go
+++ b/api/v1alpha08/sonataflowplatform_types.go
@@ -63,6 +63,9 @@ type SonataFlowPlatformSpec struct {
// These properties MAY NOT be propagated to a
SonataFlowClusterPlatform since PropertyVarSource can only refer local context
sources.
// +optional
Properties *PropertyPlatformSpec `json:"properties,omitempty"`
+ // Settings for Prometheus monitoring
+ // +optional
+ Monitoring *PlatformMonitoringOptionsSpec `json:"monitoring,omitempty"`
}
// PlatformEventingSpec specifies the Knative Eventing integration details in
the platform.
@@ -74,6 +77,15 @@ type PlatformEventingSpec struct {
Broker *duckv1.Destination `json:"broker,omitempty"`
}
+// PlatformMonitoringOptionsSpec specifies the settings for monitoring
+// +k8s:openapi-gen=true
+type PlatformMonitoringOptionsSpec struct {
+ // Enabled indicates whether monitoring with Prometheus metrics is
enabled
+ // +optional
+ // +default: false
+ Enabled bool `json:"enabled,omitempty"`
+}
+
// PlatformCluster is the kind of orchestration cluster the platform is
installed into
// +kubebuilder:validation:Enum=kubernetes;openshift
type PlatformCluster string
diff --git a/api/v1alpha08/zz_generated.deepcopy.go
b/api/v1alpha08/zz_generated.deepcopy.go
index 6e137ac0..0a594f8a 100644
--- a/api/v1alpha08/zz_generated.deepcopy.go
+++ b/api/v1alpha08/zz_generated.deepcopy.go
@@ -449,6 +449,21 @@ func (in *PlatformEventingSpec) DeepCopy()
*PlatformEventingSpec {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
+func (in *PlatformMonitoringOptionsSpec) DeepCopyInto(out
*PlatformMonitoringOptionsSpec) {
+ *out = *in
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new PlatformMonitoringOptionsSpec.
+func (in *PlatformMonitoringOptionsSpec) DeepCopy()
*PlatformMonitoringOptionsSpec {
+ if in == nil {
+ return nil
+ }
+ out := new(PlatformMonitoringOptionsSpec)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
func (in *PlatformPersistenceOptionsSpec) DeepCopyInto(out
*PlatformPersistenceOptionsSpec) {
*out = *in
@@ -1289,6 +1304,11 @@ func (in *SonataFlowPlatformSpec) DeepCopyInto(out
*SonataFlowPlatformSpec) {
*out = new(PropertyPlatformSpec)
(*in).DeepCopyInto(*out)
}
+ if in.Monitoring != nil {
+ in, out := &in.Monitoring, &out.Monitoring
+ *out = new(PlatformMonitoringOptionsSpec)
+ **out = **in
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new SonataFlowPlatformSpec.
diff --git a/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml
b/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml
index 65b97f91..d911e766 100644
--- a/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml
+++ b/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml
@@ -524,6 +524,14 @@ spec:
type: string
type: object
type: object
+ monitoring:
+ description: Settings for Prometheus monitoring
+ properties:
+ enabled:
+ description: Enabled indicates whether monitoring with
Prometheus
+ metrics is enabled
+ type: boolean
+ type: object
persistence:
description: |-
Persistence defines the platform persistence configuration.
When this field is set,
diff --git a/cmd/main.go b/cmd/main.go
index 7d625360..8a5bce67 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -27,14 +27,14 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/cfg"
"github.com/apache/incubator-kie-kogito-serverless-operator/version"
+ prometheus
"github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
+ "k8s.io/klog/v2/klogr"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
- "k8s.io/klog/v2/klogr"
-
"k8s.io/klog/v2"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
@@ -66,6 +66,7 @@ func init() {
utilruntime.Must(sourcesv1.AddToScheme(scheme))
utilruntime.Must(eventingv1.AddToScheme(scheme))
utilruntime.Must(servingv1.AddToScheme(scheme))
+ utilruntime.Must(prometheus.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}
diff --git a/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml
b/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml
index 7246a247..3d9aa2ad 100644
--- a/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml
+++ b/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml
@@ -524,6 +524,14 @@ spec:
type: string
type: object
type: object
+ monitoring:
+ description: Settings for Prometheus monitoring
+ properties:
+ enabled:
+ description: Enabled indicates whether monitoring with
Prometheus
+ metrics is enabled
+ type: boolean
+ type: object
persistence:
description: |-
Persistence defines the platform persistence configuration.
When this field is set,
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index 99a3b01d..ecba276f 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -21,6 +21,17 @@ kind: ClusterRole
metadata:
name: manager-role
rules:
+- apiGroups:
+ - monitoring.coreos.com
+ resources:
+ - servicemonitors
+ verbs:
+ - create
+ - delete
+ - get
+ - list
+ - update
+ - watch
- apiGroups:
- sonataflow.org
resources:
diff --git a/go.work.sum b/go.work.sum
index 5b78b105..05c69792 100644
--- a/go.work.sum
+++ b/go.work.sum
@@ -2247,6 +2247,7 @@
github.com/google/go-containerregistry/pkg/authn/kubernetes v0.0.0-2023020916533
github.com/google/go-github v17.0.0+incompatible
h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4rEjNlfyDHW9dolSY=
github.com/google/go-github/v27 v27.0.6
h1:oiOZuBmGHvrGM1X9uNUAUlLgp5r1UUO/M/KnbHnLRlQ=
github.com/google/go-github/v27 v27.0.6/go.mod
h1:/0Gr8pJ55COkmv+S/yPKCczSkUPIM/LnFyubufRNIS0=
+github.com/google/go-jsonnet v0.18.0/go.mod
h1:C3fTzyVJDslXdiTqw/bTFk7vSGyCtH3MGRbDfvEwGd0=
github.com/google/go-pkcs11 v0.2.1-0.20230907215043-c6f79328ddf9
h1:OF1IPgv+F4NmqmJ98KTjdN97Vs1JxDPB3vbmYzV2dpk=
github.com/google/go-pkcs11 v0.2.1-0.20230907215043-c6f79328ddf9/go.mod
h1:6eQoGcuNJpa7jnd5pMGdkSaQpNDYvPlXWMcjXXThLlY=
github.com/google/go-querystring v1.0.0
h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk=
@@ -2588,6 +2589,7 @@ github.com/openzipkin/zipkin-go v0.3.0/go.mod
h1:4c3sLeE8xjNqehmF5RpAFLPLJxXscc0
github.com/openzipkin/zipkin-go v0.4.2
h1:zjqfqHjUpPmB3c1GlCvvgsM1G4LkvqQbBDueDOCg/jA=
github.com/openzipkin/zipkin-go v0.4.2/go.mod
h1:ZeVkFjuuBiSy13y8vpSDCjMi9GoI3hPpCJSBx/EYFhY=
github.com/operator-framework/api v0.1.1
h1:DbfxRJUPMQlQW6nbfoNzWLxv1rIv13Gt8GbsF2aglFk=
+github.com/operator-framework/operator-lib v0.11.0/go.mod
h1:RpyKhFAoG6DmKTDIwMuO6pI3LRc8IE9rxEYWy476o6g=
github.com/operator-framework/operator-registry v1.6.1
h1:Ow0Ko9DRIZ4xvH55vFAslcTy6A9FhlIeXvm+FhyRd84=
github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde
h1:x0TT0RDC7UhAVbbWWBzr41ElhJx5tXPWkIHA2HWPRuw=
github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod
h1:nZgzbfBr3hhjoZnS66nKrHmduYNpc34ny7RK4z5/HM0=
diff --git a/internal/controller/knative/knative.go
b/internal/controller/knative/knative.go
index c99942cf..48aabd1e 100644
--- a/internal/controller/knative/knative.go
+++ b/internal/controller/knative/knative.go
@@ -30,7 +30,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
- "k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
@@ -44,7 +43,6 @@ import (
var servingClient clientservingv1.ServingV1Interface
var eventingClient clienteventingv1.EventingV1Interface
-var discoveryClient discovery.DiscoveryInterface
type Availability struct {
Eventing bool
@@ -92,23 +90,8 @@ func NewKnativeEventingClient(cfg *rest.Config)
(*clienteventingv1.EventingV1Cli
return clienteventingv1.NewForConfig(cfg)
}
-func getDiscoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface,
error) {
- if discoveryClient == nil {
- if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err
!= nil {
- return nil, err
- } else {
- discoveryClient = cli
- }
- }
- return discoveryClient, nil
-}
-
-func SetDiscoveryClient(cli discovery.DiscoveryInterface) {
- discoveryClient = cli
-}
-
func GetKnativeAvailability(cfg *rest.Config) (*Availability, error) {
- if cli, err := getDiscoveryClient(cfg); err != nil {
+ if cli, err := utils.GetDiscoveryClient(cfg); err != nil {
return nil, err
} else {
apiList, err := cli.ServerGroups()
diff --git a/utils/kubernetes/security.go
b/internal/controller/monitoring/monitoring.go
similarity index 56%
copy from utils/kubernetes/security.go
copy to internal/controller/monitoring/monitoring.go
index d17a51ac..0ce2a97e 100644
--- a/utils/kubernetes/security.go
+++ b/internal/controller/monitoring/monitoring.go
@@ -17,24 +17,36 @@
* under the License.
*/
-package kubernetes
+package monitoring
import (
- corev1 "k8s.io/api/core/v1"
-
+ operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
+ "k8s.io/client-go/rest"
+)
+
+const (
+ prometheusGroup = "monitoring.coreos.com"
)
-func SecurityDefaults() *corev1.SecurityContext {
- return &corev1.SecurityContext{
- AllowPrivilegeEscalation: utils.Pbool(false),
- Privileged: utils.Pbool(false),
- RunAsNonRoot: utils.Pbool(true),
- SeccompProfile: &corev1.SeccompProfile{
- Type: corev1.SeccompProfileTypeRuntimeDefault,
- },
- Capabilities: &corev1.Capabilities{
- Drop: []corev1.Capability{"ALL"},
- },
+func GetPrometheusAvailability(cfg *rest.Config) (bool, error) {
+ cli, err := utils.GetDiscoveryClient(cfg)
+ if err != nil {
+ return false, err
+ }
+ apiList, err := cli.ServerGroups()
+ if err != nil {
+ return false, err
+ }
+ for _, group := range apiList.Groups {
+ if group.Name == prometheusGroup {
+ return true, nil
+ }
+
}
+ return false, nil
+}
+
+func IsMonitoringEnabled(pl *operatorapi.SonataFlowPlatform) bool {
+ return pl != nil && pl.Spec.Monitoring != nil &&
pl.Spec.Monitoring.Enabled
}
diff --git a/internal/controller/profiles/common/object_creators.go
b/internal/controller/profiles/common/object_creators.go
index b0a9ff4e..240c8aef 100644
--- a/internal/controller/profiles/common/object_creators.go
+++ b/internal/controller/profiles/common/object_creators.go
@@ -32,6 +32,7 @@ import (
cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model"
"github.com/imdario/mergo"
+ prometheus
"github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
@@ -64,6 +65,8 @@ const (
deploymentKind = "Deployment"
k8sServiceAPIVersion = "v1"
k8sServiceKind = "Service"
+ k8sServicePortName = "web"
+ metricsServicePortPath = "/q/metrics"
)
// ObjectCreator is the func that creates the initial reference object, if the
object doesn't exist in the cluster, this one is created.
@@ -262,6 +265,7 @@ func ServiceCreator(workflow *operatorapi.SonataFlow)
(client.Object, error) {
Spec: corev1.ServiceSpec{
Selector: lbl,
Ports: []corev1.ServicePort{{
+ Name: k8sServicePortName,
Protocol: corev1.ProtocolTCP,
Port: defaultHTTPServicePort,
TargetPort:
variables.DefaultHTTPWorkflowPortIntStr,
@@ -439,10 +443,37 @@ func UserPropsConfigMapCreator(workflow
*operatorapi.SonataFlow) (client.Object,
// ManagedPropsConfigMapCreator creates an empty ConfigMap to hold the
external application properties
func ManagedPropsConfigMapCreator(workflow *operatorapi.SonataFlow, platform
*operatorapi.SonataFlowPlatform) (client.Object, error) {
-
props, err := properties.ApplicationManagedProperties(workflow,
platform)
if err != nil {
return nil, err
}
return workflowproj.CreateNewManagedPropsConfigMap(workflow, props), nil
}
+
+// ServiceMonitorCreator is an ObjectsCreator for Service Monitor for the
workflow service.
+func ServiceMonitorCreator(workflow *operatorapi.SonataFlow) (client.Object,
error) {
+ lbl := workflowproj.GetMergedLabels(workflow)
+ spec := &prometheus.ServiceMonitorSpec{
+ Selector: metav1.LabelSelector{
+ MatchLabels: map[string]string{
+ workflowproj.LabelWorkflow:
workflow.Name,
+ workflowproj.LabelWorkflowNamespace:
workflow.Namespace,
+ },
+ },
+ Endpoints: []prometheus.Endpoint{
+ {
+ Port: k8sServicePortName,
+ Path: metricsServicePortPath,
+ },
+ },
+ }
+ serviceMonitor := &prometheus.ServiceMonitor{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: workflow.Name,
+ Namespace: workflow.Namespace,
+ Labels: lbl,
+ },
+ Spec: *spec,
+ }
+ return serviceMonitor, nil
+}
diff --git a/internal/controller/profiles/common/object_creators_test.go
b/internal/controller/profiles/common/object_creators_test.go
index 46f1c894..9e2b5274 100644
--- a/internal/controller/profiles/common/object_creators_test.go
+++ b/internal/controller/profiles/common/object_creators_test.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata"
"github.com/magiconair/properties"
+ prometheus
"github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
@@ -941,3 +942,30 @@ func doTestDefaultContainer_WithPlatformPersistence(t
*testing.T, workflow *v1al
assert.Nil(t, container.Env)
}
}
+
+func TestEnsureWorkflowServiceMonitorIsCreatedWhenDeployedAsDeployment(t
*testing.T) {
+ workflow := test.GetVetEventSonataFlow(t.Name())
+ assert.Equal(t, workflow.IsKnativeDeployment(), false)
+ serviceMonitor, err := ServiceMonitorCreator(workflow)
+ assert.NoError(t, err)
+ assert.NotNil(t, serviceMonitor)
+ serviceMonitor.SetUID("1")
+ serviceMonitor.SetResourceVersion("1")
+ reflectServiceMonitor := serviceMonitor.(*prometheus.ServiceMonitor)
+
+ assert.NotNil(t, reflectServiceMonitor)
+ assert.NotNil(t, reflectServiceMonitor.Spec)
+ assert.Equal(t, len(reflectServiceMonitor.Spec.Selector.MatchLabels), 2)
+ assert.Equal(t,
reflectServiceMonitor.Spec.Selector.MatchLabels[workflowproj.LabelWorkflow],
workflow.Name)
+ assert.Equal(t,
reflectServiceMonitor.Spec.Selector.MatchLabels[workflowproj.LabelWorkflowNamespace],
workflow.Namespace)
+ assert.Equal(t, reflectServiceMonitor.Spec.Endpoints[0].Port,
k8sServicePortName)
+ assert.Equal(t, reflectServiceMonitor.Spec.Endpoints[0].Path,
metricsServicePortPath)
+ assert.NotNil(t, reflectServiceMonitor.GetLabels())
+ assert.Equal(t, reflectServiceMonitor.ObjectMeta.Labels,
map[string]string{
+ "app": workflow.Name,
+ "sonataflow.org/workflow-app": workflow.Name,
+ "sonataflow.org/workflow-namespace": workflow.Namespace,
+ "app.kubernetes.io/name": workflow.Name,
+ "app.kubernetes.io/component": "serverless-workflow",
+ "app.kubernetes.io/managed-by": "sonataflow-operator"})
+}
diff --git a/internal/controller/profiles/dev/profile_dev.go
b/internal/controller/profiles/dev/profile_dev.go
index 969d1c2d..58523273 100644
--- a/internal/controller/profiles/dev/profile_dev.go
+++ b/internal/controller/profiles/dev/profile_dev.go
@@ -78,6 +78,7 @@ func newObjectEnsurers(support *common.StateSupport)
*objectEnsurers {
return &objectEnsurers{
deployment:
common.NewObjectEnsurerWithPlatform(support.C, deploymentCreator),
service: common.NewObjectEnsurer(support.C,
serviceCreator),
+ serviceMonitor: common.NewObjectEnsurer(support.C,
common.ServiceMonitorCreator),
network: common.NewNoopObjectEnsurer(),
definitionConfigMap: common.NewObjectEnsurer(support.C,
workflowDefConfigMapCreator),
userPropsConfigMap: common.NewObjectEnsurer(support.C,
common.UserPropsConfigMapCreator),
@@ -89,6 +90,7 @@ func newObjectEnsurersOpenShift(support *common.StateSupport)
*objectEnsurers {
return &objectEnsurers{
deployment:
common.NewObjectEnsurerWithPlatform(support.C, deploymentCreator),
service: common.NewObjectEnsurer(support.C,
serviceCreator),
+ serviceMonitor: common.NewObjectEnsurer(support.C,
common.ServiceMonitorCreator),
network: common.NewObjectEnsurer(support.C,
common.OpenShiftRouteCreator),
definitionConfigMap: common.NewObjectEnsurer(support.C,
workflowDefConfigMapCreator),
userPropsConfigMap: common.NewObjectEnsurer(support.C,
common.UserPropsConfigMapCreator),
@@ -111,6 +113,7 @@ func newStatusEnrichersOpenShift(support
*common.StateSupport) *statusEnrichers
type objectEnsurers struct {
deployment common.ObjectEnsurerWithPlatform
service common.ObjectEnsurer
+ serviceMonitor common.ObjectEnsurer
network common.ObjectEnsurer
definitionConfigMap common.ObjectEnsurer
userPropsConfigMap common.ObjectEnsurer
diff --git a/internal/controller/profiles/dev/profile_dev_test.go
b/internal/controller/profiles/dev/profile_dev_test.go
index b349d338..7d4b4473 100644
--- a/internal/controller/profiles/dev/profile_dev_test.go
+++ b/internal/controller/profiles/dev/profile_dev_test.go
@@ -51,7 +51,6 @@ import (
clientruntime "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/apache/incubator-kie-kogito-serverless-operator/api"
-
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/test"
)
@@ -60,7 +59,7 @@ func Test_OverrideStartupProbe(t *testing.T) {
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
devReconciler := NewProfileReconciler(client, &rest.Config{},
test.NewFakeRecorder())
@@ -88,7 +87,7 @@ func Test_recoverFromFailureNoDeployment(t *testing.T) {
workflow.Status.Manager().MarkFalse(api.RunningConditionType,
api.DeploymentFailureReason, "")
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
reconciler := NewProfileReconciler(client, &rest.Config{},
test.NewFakeRecorder())
// we are in failed state and have no objects
@@ -129,7 +128,7 @@ func Test_newDevProfile(t *testing.T) {
workflow := test.GetBaseSonataFlow(t.Name())
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
devReconciler := NewProfileReconciler(client, &rest.Config{},
test.NewFakeRecorder())
@@ -212,7 +211,7 @@ func Test_newDevProfile(t *testing.T) {
func Test_devProfileImageDefaultsNoPlatform(t *testing.T) {
workflow := test.GetBaseSonataFlowWithDevProfile(t.Name())
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
devReconciler := NewProfileReconciler(client, &rest.Config{},
test.NewFakeRecorder())
@@ -231,7 +230,7 @@ func Test_devProfileWithImageSnapshotOverrideWithPlatform(t
*testing.T) {
platform :=
test.GetBasePlatformWithDevBaseImageInReadyPhase(workflow.Namespace)
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow,
platform).WithStatusSubresource(workflow, platform).Build()
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
devReconciler := NewProfileReconciler(client, &rest.Config{},
test.NewFakeRecorder())
@@ -250,7 +249,7 @@ func
Test_devProfileWithWPlatformWithoutDevBaseImageAndWithBaseImage(t *testing.
platform :=
test.GetBasePlatformWithBaseImageInReadyPhase(workflow.Namespace)
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow,
platform).WithStatusSubresource(workflow, platform).Build()
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
devReconciler := NewProfileReconciler(client, &rest.Config{},
test.NewFakeRecorder())
@@ -269,7 +268,7 @@ func
Test_devProfileWithPlatformWithoutDevBaseImageAndWithoutBaseImage(t *testin
platform := test.GetBasePlatformInReadyPhase(workflow.Namespace)
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow,
platform).WithStatusSubresource(workflow, platform).Build()
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
devReconciler := NewProfileReconciler(client, &rest.Config{},
test.NewFakeRecorder())
@@ -289,7 +288,7 @@ func Test_newDevProfileWithExternalConfigMaps(t *testing.T)
{
operatorapi.ConfigMapWorkflowResource{ConfigMap:
corev1.LocalObjectReference{Name: configmapName}, WorkflowPath: "routes"})
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
devReconciler := NewProfileReconciler(client, &rest.Config{},
test.NewFakeRecorder())
@@ -404,7 +403,7 @@ func Test_VolumeWithCapitalizedPaths(t *testing.T) {
workflow :=
test.GetSonataFlow(test.SonataFlowGreetingsWithStaticResourcesCR, t.Name())
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow,
configMap).WithStatusSubresource(workflow, configMap).Build()
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
devReconciler := NewProfileReconciler(client, &rest.Config{},
test.NewFakeRecorder())
diff --git a/internal/controller/profiles/dev/states_dev.go
b/internal/controller/profiles/dev/states_dev.go
index 3386cb27..86c34c85 100644
--- a/internal/controller/profiles/dev/states_dev.go
+++ b/internal/controller/profiles/dev/states_dev.go
@@ -34,6 +34,7 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/api"
operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/monitoring"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/constants"
@@ -111,6 +112,14 @@ func (e *ensureRunningWorkflowState) Do(ctx
context.Context, workflow *operatora
}
objs = append(objs, service)
+ serviceMonitor, err := e.ensureServiceMonitor(ctx, workflow, pl)
+ if err != nil {
+ return ctrl.Result{RequeueAfter:
constants.RequeueAfterFailure}, objs, err
+ }
+ if serviceMonitor != nil {
+ objs = append(objs, serviceMonitor)
+ }
+
route, _, err := e.ensurers.network.Ensure(ctx, workflow)
if err != nil {
return ctrl.Result{RequeueAfter:
constants.RequeueAfterFailure}, objs, err
@@ -142,6 +151,14 @@ func (e *ensureRunningWorkflowState) Do(ctx
context.Context, workflow *operatora
return ctrl.Result{RequeueAfter: constants.RequeueAfterIsRunning},
objs, nil
}
+func (e *ensureRunningWorkflowState) ensureServiceMonitor(ctx context.Context,
workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform)
(client.Object, error) {
+ if monitoring.IsMonitoringEnabled(pl) {
+ serviceMonitor, _, err := e.ensurers.serviceMonitor.Ensure(ctx,
workflow)
+ return serviceMonitor, err
+ }
+ return nil, nil
+}
+
func (e *ensureRunningWorkflowState) PostReconcile(ctx context.Context,
workflow *operatorapi.SonataFlow) error {
//By default, we don't want to perform anything after the
reconciliation, and so we will simply return no error
return nil
diff --git a/internal/controller/profiles/gitops/profile_gitops_test.go
b/internal/controller/profiles/gitops/profile_gitops_test.go
index 45892a70..77ac9685 100644
--- a/internal/controller/profiles/gitops/profile_gitops_test.go
+++ b/internal/controller/profiles/gitops/profile_gitops_test.go
@@ -23,8 +23,8 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/api"
operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
-
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/test"
+ "github.com/apache/incubator-kie-kogito-serverless-operator/utils"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
@@ -43,7 +43,7 @@ func Test_Reconciler_ProdOps(t *testing.T) {
WithRuntimeObjects(workflow).
WithStatusSubresource(workflow,
&operatorapi.SonataFlowBuild{}).Build()
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
result, err := NewProfileForOpsReconciler(client, &rest.Config{},
test.NewFakeRecorder()).Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
diff --git a/internal/controller/profiles/monitoring/monitoring.go
b/internal/controller/profiles/monitoring/monitoring.go
new file mode 100644
index 00000000..86b3fee0
--- /dev/null
+++ b/internal/controller/profiles/monitoring/monitoring.go
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package monitoring
+
+import (
+ "context"
+
+ operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/monitoring"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common"
+ "github.com/apache/incubator-kie-kogito-serverless-operator/log"
+ "k8s.io/klog/v2"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+var _ MonitoringEventingHandler = &monitoringObjectManager{}
+
+type monitoringObjectManager struct {
+ serviceMonitor common.ObjectEnsurer
+ *common.StateSupport
+}
+
+func NewMonitoringHandler(support *common.StateSupport)
MonitoringEventingHandler {
+ return &monitoringObjectManager{
+ serviceMonitor: common.NewObjectEnsurer(support.C,
common.ServiceMonitorCreator),
+ StateSupport: support,
+ }
+}
+
+type MonitoringEventingHandler interface {
+ Ensure(ctx context.Context, workflow *operatorapi.SonataFlow)
([]client.Object, error)
+}
+
+func (k monitoringObjectManager) Ensure(ctx context.Context, workflow
*operatorapi.SonataFlow) ([]client.Object, error) {
+ var objs []client.Object
+ monitoringAvail, err := monitoring.GetPrometheusAvailability(k.Cfg)
+ if err != nil {
+ klog.V(log.I).InfoS("Error checking Prometheus availability:
%v", err)
+ return nil, err
+ }
+ if monitoringAvail {
+ // create serviceMonitor
+ serviceMonitor, _, err := k.serviceMonitor.Ensure(ctx, workflow)
+ if err != nil {
+ return objs, err
+ } else if serviceMonitor != nil {
+ objs = append(objs, serviceMonitor)
+ }
+ }
+ return objs, nil
+}
diff --git a/internal/controller/profiles/preview/deployment_handler.go
b/internal/controller/profiles/preview/deployment_handler.go
index 4cac9bc8..17cc49ed 100644
--- a/internal/controller/profiles/preview/deployment_handler.go
+++ b/internal/controller/profiles/preview/deployment_handler.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/api"
operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/monitoring"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform/services"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common"
@@ -157,12 +158,21 @@ func (d *DeploymentReconciler) ensureObjects(ctx
context.Context, workflow *oper
return reconcile.Result{}, nil, err
}
+ objs := []client.Object{deployment, managedPropsCM, service}
eventingObjs, err := common.NewKnativeEventingHandler(d.StateSupport,
pl).Ensure(ctx, workflow)
if err != nil {
return reconcile.Result{}, nil, err
}
+ objs = append(objs, eventingObjs...)
+
+ serviceMonitor, err := d.ensureServiceMonitor(ctx, workflow, pl)
+ if err != nil {
+ return reconcile.Result{}, nil, err
+ }
+ if serviceMonitor != nil {
+ objs = append(objs, serviceMonitor)
+ }
- objs := []client.Object{deployment, managedPropsCM, service}
if deploymentOp == controllerutil.OperationResultCreated {
workflow.Status.Manager().MarkFalse(api.RunningConditionType,
api.WaitingForDeploymentReason, "")
if _, err := d.PerformStatusUpdate(ctx, workflow); err != nil {
@@ -170,11 +180,17 @@ func (d *DeploymentReconciler) ensureObjects(ctx
context.Context, workflow *oper
}
return reconcile.Result{RequeueAfter:
constants.RequeueAfterFollowDeployment, Requeue: true}, objs, nil
}
- objs = append(objs, eventingObjs...)
-
return reconcile.Result{}, objs, nil
}
+func (d *DeploymentReconciler) ensureServiceMonitor(ctx context.Context,
workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform)
(client.Object, error) {
+ if monitoring.IsMonitoringEnabled(pl) {
+ serviceMonitor, _, err :=
d.ensurers.ServiceMonitorByDeploymentModel(workflow).Ensure(ctx, workflow)
+ return serviceMonitor, err
+ }
+ return nil, nil
+}
+
func (d *DeploymentReconciler) deploymentModelMutateVisitors(
workflow *operatorapi.SonataFlow,
plf *operatorapi.SonataFlowPlatform,
diff --git a/internal/controller/profiles/preview/deployment_handler_test.go
b/internal/controller/profiles/preview/deployment_handler_test.go
index 1f3d8d13..f97ac91b 100644
--- a/internal/controller/profiles/preview/deployment_handler_test.go
+++ b/internal/controller/profiles/preview/deployment_handler_test.go
@@ -23,8 +23,8 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata"
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
-
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/test"
+ "github.com/apache/incubator-kie-kogito-serverless-operator/utils"
"github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj"
"github.com/magiconair/properties"
"github.com/stretchr/testify/assert"
@@ -48,7 +48,7 @@ func Test_CheckDeploymentModelIsKnative(t *testing.T) {
WithStatusSubresource(workflow).
Build()
stateSupport := fakeReconcilerSupport(cli)
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
handler := NewDeploymentReconciler(stateSupport,
NewObjectEnsurers(stateSupport))
result, objects, err := handler.ensureObjects(context.TODO(), workflow,
"")
@@ -75,7 +75,7 @@ func Test_CheckPodTemplateChangesReflectDeployment(t
*testing.T) {
WithStatusSubresource(workflow).
Build()
stateSupport := fakeReconcilerSupport(client)
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
handler := NewDeploymentReconciler(stateSupport,
NewObjectEnsurers(stateSupport))
result, objects, err := handler.Reconcile(context.TODO(), workflow)
@@ -111,7 +111,7 @@ func Test_CheckDeploymentRolloutAfterCMChange(t *testing.T)
{
WithStatusSubresource(workflow).
Build()
stateSupport := fakeReconcilerSupport(client)
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
handler := NewDeploymentReconciler(stateSupport,
NewObjectEnsurers(stateSupport))
result, objects, err := handler.Reconcile(context.TODO(), workflow)
@@ -174,7 +174,7 @@ func Test_CheckDeploymentUnchangedAfterCMChangeOtherKeys(t
*testing.T) {
WithStatusSubresource(workflow).
Build()
stateSupport := fakeReconcilerSupport(client)
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
handler := NewDeploymentReconciler(stateSupport,
NewObjectEnsurers(stateSupport))
result, objects, err := handler.Reconcile(context.TODO(), workflow)
diff --git a/internal/controller/profiles/preview/profile_preview.go
b/internal/controller/profiles/preview/profile_preview.go
index 2f483ab2..4dc95966 100644
--- a/internal/controller/profiles/preview/profile_preview.go
+++ b/internal/controller/profiles/preview/profile_preview.go
@@ -58,7 +58,9 @@ type ObjectEnsurers struct {
// kservice Knative Serving deployment for this ensurer. Don't call it
directly, use DeploymentByDeploymentModel instead
kservice common.ObjectEnsurerWithPlatform
// service for this ensurer. Don't call it directly, use
ServiceByDeploymentModel instead
- service common.ObjectEnsurer
+ service common.ObjectEnsurer
+ // serviceMonitor for this ensurer. Don't call it directly, use
ServiceMonitorByDeploymentModel instead
+ serviceMonitor common.ObjectEnsurer
userPropsConfigMap common.ObjectEnsurer
managedPropsConfigMap common.ObjectEnsurerWithPlatform
}
@@ -80,12 +82,22 @@ func (o *ObjectEnsurers) ServiceByDeploymentModel(workflow
*v1alpha08.SonataFlow
return o.service
}
+// ServiceMonitorByDeploymentModel gets the service monitor ensurer based on
the SonataFlow deployment model
+func (o *ObjectEnsurers) ServiceMonitorByDeploymentModel(workflow
*v1alpha08.SonataFlow) common.ObjectEnsurer {
+ if workflow.IsKnativeDeployment() {
+ // Do not create service monitor for workflows deployed as
Knative service
+ return common.NewNoopObjectEnsurer()
+ }
+ return o.serviceMonitor
+}
+
// NewObjectEnsurers common.ObjectEnsurer(s) for the preview profile.
func NewObjectEnsurers(support *common.StateSupport) *ObjectEnsurers {
return &ObjectEnsurers{
deployment:
common.NewObjectEnsurerWithPlatform(support.C, common.DeploymentCreator),
kservice:
common.NewObjectEnsurerWithPlatform(support.C, common.KServiceCreator),
service: common.NewObjectEnsurer(support.C,
common.ServiceCreator),
+ serviceMonitor: common.NewObjectEnsurer(support.C,
common.ServiceMonitorCreator),
userPropsConfigMap: common.NewObjectEnsurer(support.C,
common.UserPropsConfigMapCreator),
managedPropsConfigMap:
common.NewObjectEnsurerWithPlatform(support.C,
common.ManagedPropsConfigMapCreator),
}
diff --git a/internal/controller/profiles/preview/profile_preview_test.go
b/internal/controller/profiles/preview/profile_preview_test.go
index 7e472349..aed417bd 100644
--- a/internal/controller/profiles/preview/profile_preview_test.go
+++ b/internal/controller/profiles/preview/profile_preview_test.go
@@ -26,9 +26,11 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/api"
operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
-
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common"
"github.com/apache/incubator-kie-kogito-serverless-operator/test"
+ "github.com/apache/incubator-kie-kogito-serverless-operator/utils"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj"
+ prometheus
"github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
@@ -50,7 +52,7 @@ func Test_Reconciler_ProdCustomPod(t *testing.T) {
client := test.NewSonataFlowClientBuilder().
WithRuntimeObjects(workflow, build, platform).
WithStatusSubresource(workflow, build, platform).Build()
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
_, err := NewProfileReconciler(client, &rest.Config{},
test.NewFakeRecorder()).Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
@@ -82,7 +84,7 @@ func Test_reconcilerProdBuildConditions(t *testing.T) {
client := test.NewSonataFlowClientBuilder().
WithRuntimeObjects(workflow, platform).
WithStatusSubresource(workflow, platform,
&operatorapi.SonataFlowBuild{}).Build()
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
result, err := NewProfileReconciler(client, &rest.Config{},
test.NewFakeRecorder()).Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
@@ -139,12 +141,13 @@ func Test_reconcilerProdBuildConditions(t *testing.T) {
func Test_deployWorkflowReconciliationHandler_handleObjects(t *testing.T) {
workflow := test.GetBaseSonataFlow(t.Name())
platform := test.GetBasePlatformInReadyPhase(t.Name())
+ platform.Spec.Monitoring =
&operatorapi.PlatformMonitoringOptionsSpec{Enabled: true}
build := test.GetLocalSucceedSonataFlowBuild(workflow.Name,
workflow.Namespace)
- client := test.NewSonataFlowClientBuilder().
+ client := test.NewKogitoClientBuilderWithOpenShift().
WithRuntimeObjects(workflow, platform, build).
WithStatusSubresource(workflow, platform, build).
Build()
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
handler := &deployWithBuildWorkflowState{
StateSupport: fakeReconcilerSupport(client),
ensurers: NewObjectEnsurers(&common.StateSupport{C:
client}),
@@ -153,7 +156,7 @@ func
Test_deployWorkflowReconciliationHandler_handleObjects(t *testing.T) {
assert.Greater(t, result.RequeueAfter, int64(0))
assert.NoError(t, err)
assert.NotNil(t, result)
- assert.Len(t, objects, 3)
+ assert.Len(t, objects, 4)
deployment := &appsv1.Deployment{}
err = client.Get(context.TODO(),
clientruntime.ObjectKeyFromObject(workflow), deployment)
@@ -164,6 +167,18 @@ func
Test_deployWorkflowReconciliationHandler_handleObjects(t *testing.T) {
assert.NoError(t, err)
assert.False(t, workflow.Status.IsReady())
assert.Equal(t, api.WaitingForDeploymentReason,
workflow.Status.GetTopLevelCondition().Reason)
+
+ serviceMonitor := &prometheus.ServiceMonitor{}
+ err = client.Get(context.TODO(),
clientruntime.ObjectKeyFromObject(workflow), serviceMonitor)
+ assert.NoError(t, err)
+ assert.NotEmpty(t, serviceMonitor.Spec)
+ assert.NotEmpty(t, serviceMonitor.Spec.Selector)
+ assert.Equal(t, len(serviceMonitor.Spec.Selector.MatchLabels), 2)
+ assert.Equal(t,
serviceMonitor.Spec.Selector.MatchLabels[workflowproj.LabelWorkflow],
workflow.Name)
+ assert.Equal(t,
serviceMonitor.Spec.Selector.MatchLabels[workflowproj.LabelWorkflowNamespace],
workflow.Namespace)
+ assert.Equal(t, len(serviceMonitor.Spec.Endpoints), 1)
+ assert.Equal(t, serviceMonitor.Spec.Endpoints[0].Port, "web")
+ assert.Equal(t, serviceMonitor.Spec.Endpoints[0].Path, "/q/metrics")
}
func Test_GenerationAnnotationCheck(t *testing.T) {
@@ -173,7 +188,7 @@ func Test_GenerationAnnotationCheck(t *testing.T) {
client := test.NewSonataFlowClientBuilder().
WithRuntimeObjects(workflow, platform).
WithStatusSubresource(workflow, platform,
&operatorapi.SonataFlowBuild{}).Build()
- knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
handler := &deployWithBuildWorkflowState{
StateSupport: fakeReconcilerSupport(client),
ensurers: NewObjectEnsurers(&common.StateSupport{C:
client}),
diff --git a/internal/controller/sonataflow_controller.go
b/internal/controller/sonataflow_controller.go
index a7734bec..7724a637 100644
--- a/internal/controller/sonataflow_controller.go
+++ b/internal/controller/sonataflow_controller.go
@@ -24,6 +24,7 @@ import (
"fmt"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/monitoring"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"
@@ -37,6 +38,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
+ prometheus
"github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -68,6 +70,7 @@ type SonataFlowReconciler struct {
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/finalizers,verbs=update
+//+kubebuilder:rbac:groups="monitoring.coreos.com",resources=servicemonitors,verbs=get;list;watch;create;update;delete
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
@@ -253,5 +256,13 @@ func (r *SonataFlowReconciler) SetupWithManager(mgr
ctrl.Manager) error {
Owns(&sourcesv1.SinkBinding{}).
Watches(&eventingv1.Trigger{},
handler.EnqueueRequestsFromMapFunc(knative.MapTriggerToPlatformRequests))
}
+ promAvail, err := monitoring.GetPrometheusAvailability(mgr.GetConfig())
+ if err != nil {
+ return err
+ }
+ if promAvail {
+ builder = builder.Owns(&prometheus.ServiceMonitor{})
+ }
+
return builder.Complete(r)
}
diff --git a/internal/controller/sonataflowplatform_controller.go
b/internal/controller/sonataflowplatform_controller.go
index 75f3d1c5..6126e165 100644
--- a/internal/controller/sonataflowplatform_controller.go
+++ b/internal/controller/sonataflowplatform_controller.go
@@ -25,6 +25,7 @@ import (
"time"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/monitoring"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
"k8s.io/klog/v2"
@@ -136,6 +137,16 @@ func (r *SonataFlowPlatformReconciler) Reconcile(ctx
context.Context, req reconc
return reconcile.Result{}, nil
}
+ if monitoring.IsMonitoringEnabled(&instance) {
+ monitoringAvail, err :=
monitoring.GetPrometheusAvailability(r.Config)
+ if err != nil {
+ return reconcile.Result{}, err
+ }
+ if !monitoringAvail {
+ r.Recorder.Event(&instance, corev1.EventTypeWarning,
"PrometheusNotAvailable", fmt.Sprintf("Monitoring is enabled in platform %s,
but Prometheus is not installed", instance.Name))
+ }
+ }
+
for _, a := range actions {
cli, _ := clientr.FromCtrlClientSchemeAndConfig(r.Client,
r.Scheme, r.Config)
a.InjectClient(cli)
diff --git a/internal/controller/sonataflowplatform_controller_test.go
b/internal/controller/sonataflowplatform_controller_test.go
index db567769..dbadd2e8 100644
--- a/internal/controller/sonataflowplatform_controller_test.go
+++ b/internal/controller/sonataflowplatform_controller_test.go
@@ -25,7 +25,6 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/clusterplatform"
-
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform/services"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/constants"
"github.com/apache/incubator-kie-kogito-serverless-operator/test"
@@ -874,7 +873,7 @@ func TestSonataFlowPlatformController(t *testing.T) {
// Create a fake client to mock API calls.
cl :=
test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp,
broker).WithStatusSubresource(ksp, broker).Build()
utils.SetClient(cl)
-
knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
// Create a SonataFlowPlatformReconciler object with the scheme
and fake client.
r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(),
&rest.Config{}, &record.FakeRecorder{}}
@@ -975,7 +974,7 @@ func TestSonataFlowPlatformController(t *testing.T) {
// Create a fake client to mock API calls.
cl :=
test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp, broker,
brokerDataIndexSource, brokerJobsServiceSource,
brokerJobsServiceSink).WithStatusSubresource(ksp, broker,
brokerDataIndexSource, brokerJobsServiceSource, brokerJobsServiceSink).Build()
utils.SetClient(cl)
-
knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient())
+
utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient())
// Create a SonataFlowPlatformReconciler object with the scheme
and fake client.
r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(),
&rest.Config{}, &record.FakeRecorder{}}
diff --git a/operator.yaml b/operator.yaml
index 98f7959a..4780b998 100644
--- a/operator.yaml
+++ b/operator.yaml
@@ -1054,6 +1054,14 @@ spec:
type: string
type: object
type: object
+ monitoring:
+ description: Settings for Prometheus monitoring
+ properties:
+ enabled:
+ description: Enabled indicates whether monitoring with
Prometheus
+ metrics is enabled
+ type: boolean
+ type: object
persistence:
description: |-
Persistence defines the platform persistence configuration.
When this field is set,
@@ -27727,6 +27735,17 @@ kind: ClusterRole
metadata:
name: sonataflow-operator-manager-role
rules:
+- apiGroups:
+ - monitoring.coreos.com
+ resources:
+ - servicemonitors
+ verbs:
+ - create
+ - delete
+ - get
+ - list
+ - update
+ - watch
- apiGroups:
- sonataflow.org
resources:
diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go
index e249d399..8fbd8243 100644
--- a/test/e2e/e2e_suite_test.go
+++ b/test/e2e/e2e_suite_test.go
@@ -101,6 +101,10 @@ var _ = BeforeSuite(func() {
} else {
GinkgoWriter.Println("Fetch pre-built workflows images in the
cluster")
err = fetchImageTagsBuiltWorkflows(workflows)
+ if err != nil {
+ GinkgoWriter.Println("Failed to fetch pre-built
workflows images, try to build them")
+ err = deployWorkflowsAndWaitForBuild(workflows)
+ }
Expect(err).NotTo(HaveOccurred())
}
diff --git a/test/e2e/helpers.go b/test/e2e/helpers.go
index 1f1334e2..4bf45146 100644
--- a/test/e2e/helpers.go
+++ b/test/e2e/helpers.go
@@ -241,7 +241,36 @@ func verifySchemaMigration(data, name string) bool {
strings.Contains(data, fmt.Sprintf("Schema \"%s\" is up
to date. No migration necessary", name)))
}
-func waitForPodRestartCompletion(label, ns string) {
+func verifyKSinkInjection(label, ns string) bool {
+ cmd := exec.Command("kubectl", "get", "pod", "-n", ns, "-l", label,
"-o", "jsonpath={.items[*].metadata.name}")
+ out, err := utils.Run(cmd)
+ if err != nil {
+ GinkgoWriter.Println(fmt.Errorf("failed to get pods: %v", err))
+ return false
+ }
+ podNames := strings.Fields(string(out))
+ if len(podNames) == 0 {
+ GinkgoWriter.Println("no pods found to check K_SINK")
+ return false // pods haven't created yet
+ }
+ GinkgoWriter.Println(fmt.Sprintf("pods found: %s", podNames))
+ for _, pod := range podNames {
+ cmd = exec.Command("kubectl", "get", "pod", pod, "-n", ns,
"-o", "json")
+ out, err := utils.Run(cmd)
+ if err != nil {
+ GinkgoWriter.Println(fmt.Errorf("failed to get pod:
%v", err))
+ return false
+ }
+ GinkgoWriter.Println(string(out))
+ if !strings.Contains(string(out), "K_SINK") { // The pod does
not have K_SINK injected
+ GinkgoWriter.Println(fmt.Sprintf("Pod does not have
K_SINK injected: %s", string(out)))
+ return false
+ }
+ }
+ return true
+}
+
+func waitForPodRestartCompletion(label, ns string) (podRunning string) {
EventuallyWithOffset(1, func() bool {
cmd := exec.Command("kubectl", "get", "pod", "-n", ns, "-l",
label, "-o", "jsonpath={.items[*].metadata.name}")
out, err := utils.Run(cmd)
@@ -257,8 +286,11 @@ func waitForPodRestartCompletion(label, ns string) {
GinkgoWriter.Println("multiple pods found")
return false // multiple pods found, wait for other
pods to terminate
}
+ podRunning = podNames[0]
return true
- }, 1*time.Minute, 5).Should(BeTrue())
+ }, 10*time.Minute, 5).Should(BeTrue())
+
+ return
}
func verifyTrigger(triggers []operatorapi.SonataFlowPlatformTriggerRef,
namePrefix, path, ns, broker string) error {
@@ -284,7 +316,7 @@ func verifyTriggerData(name, ns, path, broker string) error
{
if len(data) == 3 && broker == data[0] && strings.HasSuffix(data[1],
path) && data[2] == "True" {
return nil
}
- return fmt.Errorf("failed to verify trigger %v, data=%s", name,
string(out))
+ return fmt.Errorf("failed to verify trigger %v with namespace %v, path
%v, broker %s, and received data=%s", name, ns, path, broker, string(out))
}
func verifySinkBinding(name, ns, broker string) error {
@@ -299,3 +331,72 @@ func verifySinkBinding(name, ns, broker string) error {
}
return fmt.Errorf("failed to verify sinkbinding %v, data=%s", name,
string(out))
}
+
+func getWorkflowId(resp string) (string, error) {
+ // First find the json data
+ ind1 := strings.Index(resp, "{")
+ ind2 := strings.LastIndex(resp, "}")
+ data := resp[ind1 : ind2+1]
+ // Retrieve the id from json data
+ m := make(map[string]interface{})
+ err := json.Unmarshal([]byte(data), &m)
+ if err != nil {
+ return "", err
+ }
+ if id, ok := m["id"].(string); ok {
+ return id, nil
+ }
+ return "", fmt.Errorf("failed to find workflow id")
+}
+
+func getMetricValue(resp string) (string, error) {
+ fmt.Println(resp)
+ ind1 := strings.Index(resp, "{")
+ ind2 := strings.LastIndex(resp, "}")
+ data := resp[ind1 : ind2+1]
+
+ // Retrieve the metric value from json data
+ m := make(map[string]interface{})
+ err := json.Unmarshal([]byte(data), &m)
+ if err != nil {
+ return "", err
+ }
+ result, ok := m["data"].(map[string]interface{})["result"]
+ if !ok {
+ return "", fmt.Errorf("no valid response data received")
+ }
+ metrics := result.([]interface{})
+ if len(metrics) == 0 {
+ return "", fmt.Errorf("no valid metric data retrieved")
+ }
+ metric := metrics[0]
+ values := metric.(map[string]interface{})["value"]
+ if val, ok := (values.([]interface{}))[1].(string); ok {
+ return val, nil
+ } else {
+ return "", fmt.Errorf("failed to get metric value")
+ }
+}
+
+func getPodNameAfterWorkflowInstCreation(name, ns string) (string, error) {
+ labels :=
fmt.Sprintf("sonataflow.org/workflow-app=%s,sonataflow.org/workflow-namespace=%s",
name, ns)
+ cmd := exec.Command("kubectl", "get", "pod", "-n", ns, "-l", labels,
"-o=jsonpath='{range .items[*]}{.metadata.name}
{.status.conditions[?(@.type=='Ready')].status}{';'}{end}'")
+ fmt.Println(cmd.String())
+ out, err := utils.Run(cmd)
+ if err != nil {
+ return "", err
+ }
+ fmt.Println(string(out))
+ data := strings.Split(string(out), ";")
+ for _, line := range data {
+ res := strings.Fields(line)
+ if len(res) == 2 && strings.Contains(res[0],
"-00002-deployment-") {
+ if res[1] == "True" {
+ return res[0], nil
+ } else {
+ return "", fmt.Errorf("pod %s is not ready=",
res)
+ }
+ }
+ }
+ return "", fmt.Errorf("invalid data received: %s", string(out))
+}
diff --git a/config/rbac/role.yaml
b/test/e2e/testdata/workflows/prometheus/k8s_deployment/01-sonataflow_platform.yaml
similarity index 52%
copy from config/rbac/role.yaml
copy to
test/e2e/testdata/workflows/prometheus/k8s_deployment/01-sonataflow_platform.yaml
index 99a3b01d..3028b3d4 100644
--- a/config/rbac/role.yaml
+++
b/test/e2e/testdata/workflows/prometheus/k8s_deployment/01-sonataflow_platform.yaml
@@ -15,44 +15,11 @@
# specific language governing permissions and limitations
# under the License.
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: ClusterRole
+
+apiVersion: sonataflow.org/v1alpha08
+kind: SonataFlowPlatform
metadata:
- name: manager-role
-rules:
-- apiGroups:
- - sonataflow.org
- resources:
- - sonataflowbuilds
- - sonataflowclusterplatforms
- - sonataflowplatforms
- - sonataflows
- verbs:
- - create
- - delete
- - get
- - list
- - patch
- - update
- - watch
-- apiGroups:
- - sonataflow.org
- resources:
- - sonataflowbuilds/finalizers
- - sonataflowclusterplatforms/finalizers
- - sonataflowplatforms/finalizers
- - sonataflows/finalizers
- verbs:
- - update
-- apiGroups:
- - sonataflow.org
- resources:
- - sonataflowbuilds/status
- - sonataflowclusterplatforms/status
- - sonataflowplatforms/status
- - sonataflows/status
- verbs:
- - get
- - patch
- - update
+ name: sonataflow-platform
+spec:
+ monitoring:
+ enabled: true
\ No newline at end of file
diff --git a/config/rbac/role.yaml
b/test/e2e/testdata/workflows/prometheus/k8s_deployment/02-sonataflow_greetings.yaml
similarity index 52%
copy from config/rbac/role.yaml
copy to
test/e2e/testdata/workflows/prometheus/k8s_deployment/02-sonataflow_greetings.yaml
index 99a3b01d..7daa18d4 100644
--- a/config/rbac/role.yaml
+++
b/test/e2e/testdata/workflows/prometheus/k8s_deployment/02-sonataflow_greetings.yaml
@@ -15,44 +15,27 @@
# specific language governing permissions and limitations
# under the License.
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: ClusterRole
+
+apiVersion: sonataflow.org/v1alpha08
+kind: SonataFlow
metadata:
- name: manager-role
-rules:
-- apiGroups:
- - sonataflow.org
- resources:
- - sonataflowbuilds
- - sonataflowclusterplatforms
- - sonataflowplatforms
- - sonataflows
- verbs:
- - create
- - delete
- - get
- - list
- - patch
- - update
- - watch
-- apiGroups:
- - sonataflow.org
- resources:
- - sonataflowbuilds/finalizers
- - sonataflowclusterplatforms/finalizers
- - sonataflowplatforms/finalizers
- - sonataflows/finalizers
- verbs:
- - update
-- apiGroups:
- - sonataflow.org
- resources:
- - sonataflowbuilds/status
- - sonataflowclusterplatforms/status
- - sonataflowplatforms/status
- - sonataflows/status
- verbs:
- - get
- - patch
- - update
+ name: greetings
+ annotations:
+ sonataflow.org/description: Greetings example on k8s!
+ sonataflow.org/version: 0.0.1
+ sonataflow.org/profile: gitops
+ labels:
+ test: test
+spec:
+ podTemplate:
+ replicas: 0
+ container:
+ image: replaceme
+ flow:
+ start: HelloWorld
+ states:
+ - name: HelloWorld
+ type: inject
+ data:
+ message: Hello World
+ end: true
diff --git a/config/rbac/role.yaml
b/test/e2e/testdata/workflows/prometheus/k8s_deployment/kustomization.yaml
similarity index 52%
copy from config/rbac/role.yaml
copy to test/e2e/testdata/workflows/prometheus/k8s_deployment/kustomization.yaml
index 99a3b01d..e16a6471 100644
--- a/config/rbac/role.yaml
+++ b/test/e2e/testdata/workflows/prometheus/k8s_deployment/kustomization.yaml
@@ -15,44 +15,10 @@
# specific language governing permissions and limitations
# under the License.
----
-apiVersion: rbac.authorization.k8s.io/v1
-kind: ClusterRole
-metadata:
- name: manager-role
-rules:
-- apiGroups:
- - sonataflow.org
- resources:
- - sonataflowbuilds
- - sonataflowclusterplatforms
- - sonataflowplatforms
- - sonataflows
- verbs:
- - create
- - delete
- - get
- - list
- - patch
- - update
- - watch
-- apiGroups:
- - sonataflow.org
- resources:
- - sonataflowbuilds/finalizers
- - sonataflowclusterplatforms/finalizers
- - sonataflowplatforms/finalizers
- - sonataflows/finalizers
- verbs:
- - update
-- apiGroups:
- - sonataflow.org
- resources:
- - sonataflowbuilds/status
- - sonataflowclusterplatforms/status
- - sonataflowplatforms/status
- - sonataflows/status
- verbs:
- - get
- - patch
- - update
+
+resources:
+- 01-sonataflow_platform.yaml
+- 02-sonataflow_greetings.yaml
+
+sortOptions:
+ order: fifo
\ No newline at end of file
diff --git a/test/e2e/workflow_test.go b/test/e2e/workflow_test.go
index e7e0a1aa..21b9582c 100644
--- a/test/e2e/workflow_test.go
+++ b/test/e2e/workflow_test.go
@@ -136,9 +136,7 @@ var _ = Describe("Workflow Non-Persistence Use Cases :: ",
Label("flows-ephemera
return err
}, 3*time.Minute, time.Second).Should(Succeed())
})
-
})
-
})
var _ = Describe("Workflow Persistence Use Cases :: ",
Label("flows-persistence"), Ordered, func() {
@@ -168,7 +166,6 @@ var _ = Describe("Workflow Persistence Use Cases :: ",
Label("flows-persistence"
}
})
-
DescribeTable("when deploying a SonataFlow CR with PostgreSQL
persistence", func(testcaseDir string, withPersistence bool, waitKSinkInjection
bool) {
By("Deploy the CR")
var manifests []byte
@@ -274,5 +271,96 @@ var _ = Describe("Workflow Persistence Use Cases :: ",
Label("flows-persistence"
Entry("defined from the sonataflow platform as reference and
without DI and JS", test.GetPathFromE2EDirectory("workflows", "persistence",
"from_platform_without_di_and_js_services"), true, false),
Entry("defined from the sonataflow platform as reference but
not required by the workflow", test.GetPathFromE2EDirectory("workflows",
"persistence", "from_platform_with_no_persistence_required"), false, false),
)
+})
+
+var _ = Describe("Workflow Monitoring Use Cases :: ",
Label("flows-monitoring"), Ordered, func() {
+
+ var targetNamespace string
+ BeforeEach(func() {
+ targetNamespace = fmt.Sprintf("test-%d",
rand.Intn(randomIntRange)+1)
+ err := kubectlCreateNamespace(targetNamespace)
+ Expect(err).NotTo(HaveOccurred())
+ })
+ AfterEach(func() {
+ // Remove resources in test namespace
+ if !CurrentSpecReport().Failed() && len(targetNamespace) > 0 {
+ cmd := exec.Command("kubectl", "delete", "sonataflow",
"--all", "-n", targetNamespace, "--wait")
+ _, err := utils.Run(cmd)
+ Expect(err).NotTo(HaveOccurred())
+ err = kubectlDeleteNamespace(targetNamespace)
+ Expect(err).NotTo(HaveOccurred())
+ }
+ })
+ Describe("basic workflow monitoring", func() {
+ It("should create servicemonitor for workflow deployed as k8s
deployment when monitoring enabled in platform", func() {
+ By("Deploy the SonataFlowPlatform CR and the workflow")
+ var manifests []byte
+ EventuallyWithOffset(1, func() error {
+ var err error
+ cmd := exec.Command("kubectl", "kustomize",
test.GetPathFromE2EDirectory("workflows", "prometheus", "k8s_deployment"))
+ manifests, err = utils.Run(cmd)
+ return err
+ }, time.Minute, time.Second).Should(Succeed())
+ cmd := exec.Command("kubectl", "create", "-n",
targetNamespace, "-f", "-")
+ cmd.Stdin = bytes.NewBuffer(manifests)
+ _, err := utils.Run(cmd)
+ Expect(err).NotTo(HaveOccurred())
+
+ workflowName := prebuiltWorkflows.Greetings.Name
+ By("Replacing the image with a prebuilt one and
rollout")
+ EventuallyWithOffset(1, func() error {
+ return
kubectlPatchSonataFlowImageAndRollout(targetNamespace, workflowName,
prebuiltWorkflows.Greetings.Tag)
+ }, 3*time.Minute, time.Second).Should(Succeed())
+
+ By("check the workflow is in running state")
+ EventuallyWithOffset(1, func() bool { return
verifyWorkflowIsInRunningState(workflowName, targetNamespace) },
10*time.Minute, 30*time.Second).Should(BeTrue())
+ By("Retrieve the name of the running pod for the
workflow")
+ labels :=
fmt.Sprintf("sonataflow.org/workflow-app=%s,sonataflow.org/workflow-namespace=%s",
workflowName, targetNamespace)
+ podName := waitForPodRestartCompletion(labels,
targetNamespace)
+
+ By("check service monitor has been created")
+ EventuallyWithOffset(1, func() bool {
+ cmd := exec.Command("kubectl", "get",
"servicemonitor", workflowName, "-n", targetNamespace)
+ _, err := utils.Run(cmd)
+ if err != nil {
+ GinkgoWriter.Println(fmt.Errorf("failed
to get servicemonitor: %v", err))
+ return false
+ }
+ return true
+ }, 1*time.Minute, 5).Should(BeTrue())
+
+ By("trigger a new workflow instance")
+ EventuallyWithOffset(1, func() bool {
+ curlCmd := fmt.Sprintf("curl -X POST -H
'Content-Type: application/json' -H 'Accept: */*' -d '{\"workflowdata\": {}}'
http://%s/%s", workflowName, workflowName)
+ cmd := exec.Command("kubectl", "exec", podName,
"-c", "workflow", "-n", targetNamespace, "--", "/bin/bash", "-c", curlCmd)
+ resp, err := utils.Run(cmd)
+ if err != nil {
+ GinkgoWriter.Println(fmt.Errorf("failed
to trigger workflow instance: %v", err))
+ return false
+ }
+ GinkgoWriter.Println(fmt.Errorf("Response: %v",
string(resp)))
+ return strings.Contains(string(resp), "Hello
World")
+ }, 2*time.Minute, 5).Should(BeTrue())
+
+ By("check prometheus server has workflow instance
metrics")
+ EventuallyWithOffset(1, func() bool {
+ curlCmd := fmt.Sprintf("curl
http://prometheus-operated.default:9090/api/v1/query --data-urlencode
'query=kogito_process_instance_duration_seconds_count{job=\"%s\",namespace=\"%s\"}'",
workflowName, targetNamespace)
+ GinkgoWriter.Println(curlCmd)
+ cmd := exec.Command("kubectl", "exec", podName,
"-c", "workflow", "-n", targetNamespace, "--", "/bin/bash", "-c", curlCmd)
+ resp, err := utils.Run(cmd)
+ if err != nil {
+ GinkgoWriter.Println(fmt.Errorf("failed
to get metrics from prometheus server: %v", err))
+ return false
+ }
+ if val, err := getMetricValue(string(resp));
err != nil {
+ GinkgoWriter.Println(err)
+ return false
+ } else {
+ GinkgoWriter.Println("metric value
found:", val)
+ return val == "1"
+ }
+ }, 5*time.Minute, 5).Should(BeTrue())
+ })
+ })
})
diff --git a/test/kubernetes_cli.go b/test/kubernetes_cli.go
index 810dd193..efa6b2d0 100644
--- a/test/kubernetes_cli.go
+++ b/test/kubernetes_cli.go
@@ -25,6 +25,7 @@ import (
"testing"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
+ prometheus
"github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"
@@ -103,6 +104,7 @@ func NewKogitoClientBuilderWithOpenShift()
*SonataFlowClientBuilder {
utilruntime.Must(operatorapi.AddToScheme(s))
utilruntime.Must(eventingv1.AddToScheme(s))
utilruntime.Must(sourcesv1.AddToScheme(s))
+ utilruntime.Must(prometheus.AddToScheme(s))
builder := fake.NewClientBuilder().WithScheme(s)
return &SonataFlowClientBuilder{
innerBuilder: builder,
diff --git a/config/rbac/role.yaml b/test/testdata/grafana.yaml
similarity index 53%
copy from config/rbac/role.yaml
copy to test/testdata/grafana.yaml
index 99a3b01d..c4a2f7e9 100644
--- a/config/rbac/role.yaml
+++ b/test/testdata/grafana.yaml
@@ -15,44 +15,16 @@
# specific language governing permissions and limitations
# under the License.
+
---
-apiVersion: rbac.authorization.k8s.io/v1
-kind: ClusterRole
+apiVersion: grafana.integreatly.org/v1beta1
+kind: Grafana
metadata:
- name: manager-role
-rules:
-- apiGroups:
- - sonataflow.org
- resources:
- - sonataflowbuilds
- - sonataflowclusterplatforms
- - sonataflowplatforms
- - sonataflows
- verbs:
- - create
- - delete
- - get
- - list
- - patch
- - update
- - watch
-- apiGroups:
- - sonataflow.org
- resources:
- - sonataflowbuilds/finalizers
- - sonataflowclusterplatforms/finalizers
- - sonataflowplatforms/finalizers
- - sonataflows/finalizers
- verbs:
- - update
-- apiGroups:
- - sonataflow.org
- resources:
- - sonataflowbuilds/status
- - sonataflowclusterplatforms/status
- - sonataflowplatforms/status
- - sonataflows/status
- verbs:
- - get
- - patch
- - update
+ name: grafana
+ labels:
+ dashboards: "grafana"
+spec:
+ config:
+ security:
+ admin_user: root
+ admin_password: secret
diff --git a/config/rbac/role.yaml b/test/testdata/prometheus.yaml
similarity index 50%
copy from config/rbac/role.yaml
copy to test/testdata/prometheus.yaml
index 99a3b01d..c01a35d1 100644
--- a/config/rbac/role.yaml
+++ b/test/testdata/prometheus.yaml
@@ -15,44 +15,59 @@
# specific language governing permissions and limitations
# under the License.
+
+apiVersion: monitoring.coreos.com/v1
+kind: Prometheus
+metadata:
+ name: prometheus
+spec:
+ serviceAccountName: prometheus
+ serviceMonitorNamespaceSelector: {}
+ serviceMonitorSelector: {}
+ podMonitorSelector: {}
+ resources:
+ requests:
+ memory: 400Mi
+---
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+ name: prometheus
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
- name: manager-role
+ name: prometheus
rules:
-- apiGroups:
- - sonataflow.org
+- apiGroups: [""]
resources:
- - sonataflowbuilds
- - sonataflowclusterplatforms
- - sonataflowplatforms
- - sonataflows
- verbs:
- - create
- - delete
- - get
- - list
- - patch
- - update
- - watch
-- apiGroups:
- - sonataflow.org
+ - nodes
+ - nodes/metrics
+ - services
+ - endpoints
+ - pods
+ verbs: ["get", "list", "watch"]
+- apiGroups: [""]
resources:
- - sonataflowbuilds/finalizers
- - sonataflowclusterplatforms/finalizers
- - sonataflowplatforms/finalizers
- - sonataflows/finalizers
- verbs:
- - update
+ - configmaps
+ verbs: ["get"]
- apiGroups:
- - sonataflow.org
+ - networking.k8s.io
resources:
- - sonataflowbuilds/status
- - sonataflowclusterplatforms/status
- - sonataflowplatforms/status
- - sonataflows/status
- verbs:
- - get
- - patch
- - update
+ - ingresses
+ verbs: ["get", "list", "watch"]
+- nonResourceURLs: ["/metrics"]
+ verbs: ["get"]
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRoleBinding
+metadata:
+ name: prometheus
+roleRef:
+ apiGroup: rbac.authorization.k8s.io
+ kind: ClusterRole
+ name: prometheus
+subjects:
+- kind: ServiceAccount
+ name: prometheus
+ namespace: default
diff --git a/test/yaml.go b/test/yaml.go
index 276f9b9b..752f0562 100644
--- a/test/yaml.go
+++ b/test/yaml.go
@@ -323,12 +323,13 @@ func getProjectDir() string {
return projectDir
}
-func CreateFakeKnativeDiscoveryClient() discovery.DiscoveryInterface {
+func CreateFakeKnativeAndMonitoringDiscoveryClient()
discovery.DiscoveryInterface {
return &discfake.FakeDiscovery{
Fake: &clienttesting.Fake{
Resources: []*metav1.APIResourceList{
{GroupVersion: "serving.knative.dev/v1"},
{GroupVersion: "eventing.knative.dev/v1"},
+ {GroupVersion: "monitoring.coreos.com/v1"},
},
},
}
diff --git a/utils/client.go b/utils/client.go
index 3e7bc567..3f700183 100644
--- a/utils/client.go
+++ b/utils/client.go
@@ -17,9 +17,14 @@
package utils
-import "sigs.k8s.io/controller-runtime/pkg/client"
+import (
+ "k8s.io/client-go/discovery"
+ "k8s.io/client-go/rest"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+)
var k8sClient client.Client
+var discoveryClient discovery.DiscoveryInterface
// TODO: consider refactor the internals as we progress adding features to
rely on this client instead of passing it through all the functions
@@ -33,3 +38,18 @@ func GetClient() client.Client {
func SetClient(client client.Client) {
k8sClient = client
}
+
+func GetDiscoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface,
error) {
+ if discoveryClient == nil {
+ if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err
!= nil {
+ return nil, err
+ } else {
+ discoveryClient = cli
+ }
+ }
+ return discoveryClient, nil
+}
+
+func SetDiscoveryClient(cli discovery.DiscoveryInterface) {
+ discoveryClient = cli
+}
diff --git a/utils/kubernetes/security.go b/utils/kubernetes/security.go
index d17a51ac..50914c1d 100644
--- a/utils/kubernetes/security.go
+++ b/utils/kubernetes/security.go
@@ -20,9 +20,8 @@
package kubernetes
import (
- corev1 "k8s.io/api/core/v1"
-
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
+ corev1 "k8s.io/api/core/v1"
)
func SecurityDefaults() *corev1.SecurityContext {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]