This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 039f28f7 [YUNIKORN-2010] Application metric incorrect (#667)
039f28f7 is described below

commit 039f28f7268e7e78054f64bd876a30b317adc294
Author: Yu-Lin Chen <kh87...@gmail.com>
AuthorDate: Fri Nov 3 11:58:09 2023 +1100

    [YUNIKORN-2010] Application metric incorrect (#667)
    
    The web UI shows negative number of applications running in the
    application history. Root cause is an incorrect tracking of running
    applications in the metrics.
    
    * remove 'enter_Resuming' metrics changes
    * remove '*_Completing' metrics changes
    * use 'enter_Running' for metrics changes
    * enhance metrics trace and add more test cases
    * correctly update accepted application metric
    
    Closes: #667
    
    Signed-off-by: Wilfred Spiegelenburg <wilfr...@apache.org>
---
 pkg/metrics/init.go                             |   9 +-
 pkg/metrics/metrics_collector.go                |   2 +-
 pkg/metrics/queue.go                            |  46 +++++
 pkg/metrics/scheduler.go                        |  20 +-
 pkg/scheduler/objects/application_state.go      |  40 ++--
 pkg/scheduler/objects/application_state_test.go | 242 ++++++++++++++++++++++++
 6 files changed, 337 insertions(+), 22 deletions(-)

diff --git a/pkg/metrics/init.go b/pkg/metrics/init.go
index e7cdeb56..7a4f5b4e 100644
--- a/pkg/metrics/init.go
+++ b/pkg/metrics/init.go
@@ -47,11 +47,16 @@ type Metrics struct {
 
 type CoreQueueMetrics interface {
        IncQueueApplicationsAccepted()
+       GetQueueApplicationsAccepted() (int, error)
        IncQueueApplicationsRejected()
+       GetQueueApplicationsRejected() (int, error)
        IncQueueApplicationsRunning()
        DecQueueApplicationsRunning()
+       GetQueueApplicationsRunning() (int, error)
        IncQueueApplicationsFailed()
+       GetQueueApplicationsFailed() (int, error)
        IncQueueApplicationsCompleted()
+       GetQueueApplicationsCompleted() (int, error)
        IncAllocatedContainer()
        IncReleasedContainer()
        AddReleasedContainers(value int)
@@ -102,6 +107,7 @@ type CoreSchedulerMetrics interface {
        // Metrics Ops related to TotalApplicationsRejected
        IncTotalApplicationsRejected()
        AddTotalApplicationsRejected(value int)
+       GetTotalApplicationsRejected() (int, error)
 
        // Metrics Ops related to TotalApplicationsRunning
        IncTotalApplicationsRunning()
@@ -109,7 +115,7 @@ type CoreSchedulerMetrics interface {
        DecTotalApplicationsRunning()
        SubTotalApplicationsRunning(value int)
        SetTotalApplicationsRunning(value int)
-       getTotalApplicationsRunning() (int, error)
+       GetTotalApplicationsRunning() (int, error)
 
        // Metrics Ops related to TotalApplicationsFailed
        IncTotalApplicationsFailed()
@@ -120,6 +126,7 @@ type CoreSchedulerMetrics interface {
        DecTotalApplicationsCompleted()
        SubTotalApplicationsCompleted(value int)
        SetTotalApplicationsCompleted(value int)
+       GetTotalApplicationsCompleted() (int, error)
 
        // Metrics Ops related to ActiveNodes
        IncActiveNodes()
diff --git a/pkg/metrics/metrics_collector.go b/pkg/metrics/metrics_collector.go
index ee9651ed..12bcb13e 100644
--- a/pkg/metrics/metrics_collector.go
+++ b/pkg/metrics/metrics_collector.go
@@ -67,7 +67,7 @@ func (u *internalMetricsCollector) StartService() {
 func (u *internalMetricsCollector) store() {
        log.Log(log.Metrics).Debug("Adding current status to historical 
partition data")
 
-       totalAppsRunning, err := m.scheduler.getTotalApplicationsRunning()
+       totalAppsRunning, err := m.scheduler.GetTotalApplicationsRunning()
        if err != nil {
                log.Log(log.Metrics).Warn("Could not encode totalApplications 
metric.", zap.Error(err))
                totalAppsRunning = -1
diff --git a/pkg/metrics/queue.go b/pkg/metrics/queue.go
index 8e0dc70a..cb227cf4 100644
--- a/pkg/metrics/queue.go
+++ b/pkg/metrics/queue.go
@@ -20,6 +20,7 @@ package metrics
 
 import (
        "github.com/prometheus/client_golang/prometheus"
+       dto "github.com/prometheus/client_model/go"
        "go.uber.org/zap"
 
        "github.com/apache/yunikorn-core/pkg/log"
@@ -94,22 +95,67 @@ func (m *QueueMetrics) DecQueueApplicationsRunning() {
        m.appMetrics.With(prometheus.Labels{"state": "running"}).Dec()
 }
 
+func (m *QueueMetrics) GetQueueApplicationsRunning() (int, error) {
+       metricDto := &dto.Metric{}
+       err := m.appMetrics.With(prometheus.Labels{"state": 
"running"}).Write(metricDto)
+       if err == nil {
+               return int(*metricDto.Gauge.Value), nil
+       }
+       return -1, err
+}
+
 func (m *QueueMetrics) IncQueueApplicationsAccepted() {
        m.appMetrics.With(prometheus.Labels{"state": "accepted"}).Inc()
 }
 
+func (m *QueueMetrics) GetQueueApplicationsAccepted() (int, error) {
+       metricDto := &dto.Metric{}
+       err := m.appMetrics.With(prometheus.Labels{"state": 
"accepted"}).Write(metricDto)
+       if err == nil {
+               return int(*metricDto.Gauge.Value), nil
+       }
+       return -1, err
+}
+
 func (m *QueueMetrics) IncQueueApplicationsRejected() {
        m.appMetrics.With(prometheus.Labels{"state": "rejected"}).Inc()
 }
 
+func (m *QueueMetrics) GetQueueApplicationsRejected() (int, error) {
+       metricDto := &dto.Metric{}
+       err := m.appMetrics.With(prometheus.Labels{"state": 
"rejected"}).Write(metricDto)
+       if err == nil {
+               return int(*metricDto.Gauge.Value), nil
+       }
+       return -1, err
+}
+
 func (m *QueueMetrics) IncQueueApplicationsFailed() {
        m.appMetrics.With(prometheus.Labels{"state": "failed"}).Inc()
 }
 
+func (m *QueueMetrics) GetQueueApplicationsFailed() (int, error) {
+       metricDto := &dto.Metric{}
+       err := m.appMetrics.With(prometheus.Labels{"state": 
"failed"}).Write(metricDto)
+       if err == nil {
+               return int(*metricDto.Gauge.Value), nil
+       }
+       return -1, err
+}
+
 func (m *QueueMetrics) IncQueueApplicationsCompleted() {
        m.appMetrics.With(prometheus.Labels{"state": "completed"}).Inc()
 }
 
+func (m *QueueMetrics) GetQueueApplicationsCompleted() (int, error) {
+       metricDto := &dto.Metric{}
+       err := m.appMetrics.With(prometheus.Labels{"state": 
"completed"}).Write(metricDto)
+       if err == nil {
+               return int(*metricDto.Gauge.Value), nil
+       }
+       return -1, err
+}
+
 func (m *QueueMetrics) IncAllocatedContainer() {
        m.containerMetrics.With(prometheus.Labels{"state": "allocated"}).Inc()
 }
diff --git a/pkg/metrics/scheduler.go b/pkg/metrics/scheduler.go
index b7721eae..0134c3c7 100644
--- a/pkg/metrics/scheduler.go
+++ b/pkg/metrics/scheduler.go
@@ -264,6 +264,15 @@ func (m *SchedulerMetrics) 
AddTotalApplicationsRejected(value int) {
        m.applicationSubmission.With(prometheus.Labels{"result": 
"rejected"}).Add(float64(value))
 }
 
+func (m *SchedulerMetrics) GetTotalApplicationsRejected() (int, error) {
+       metricDto := &dto.Metric{}
+       err := m.applicationSubmission.With(prometheus.Labels{"result": 
"rejected"}).Write(metricDto)
+       if err == nil {
+               return int(*metricDto.Counter.Value), nil
+       }
+       return -1, err
+}
+
 func (m *SchedulerMetrics) IncTotalApplicationsRunning() {
        m.application.With(prometheus.Labels{"state": "running"}).Inc()
 }
@@ -284,7 +293,7 @@ func (m *SchedulerMetrics) 
SetTotalApplicationsRunning(value int) {
        m.application.With(prometheus.Labels{"state": 
"running"}).Set(float64(value))
 }
 
-func (m *SchedulerMetrics) getTotalApplicationsRunning() (int, error) {
+func (m *SchedulerMetrics) GetTotalApplicationsRunning() (int, error) {
        metricDto := &dto.Metric{}
        err := m.application.With(prometheus.Labels{"state": 
"running"}).Write(metricDto)
        if err == nil {
@@ -317,6 +326,15 @@ func (m *SchedulerMetrics) 
SetTotalApplicationsCompleted(value int) {
        m.application.With(prometheus.Labels{"state": 
"completed"}).Set(float64(value))
 }
 
+func (m *SchedulerMetrics) GetTotalApplicationsCompleted() (int, error) {
+       metricDto := &dto.Metric{}
+       err := m.application.With(prometheus.Labels{"state": 
"completed"}).Write(metricDto)
+       if err == nil {
+               return int(*metricDto.Gauge.Value), nil
+       }
+       return -1, err
+}
+
 func (m *SchedulerMetrics) IncActiveNodes() {
        m.node.With(prometheus.Labels{"state": "active"}).Inc()
 }
diff --git a/pkg/scheduler/objects/application_state.go 
b/pkg/scheduler/objects/application_state.go
index 35dbc6b3..925d80f4 100644
--- a/pkg/scheduler/objects/application_state.go
+++ b/pkg/scheduler/objects/application_state.go
@@ -169,29 +169,29 @@ func NewAppState() *fsm.FSM {
                        fmt.Sprintf("enter_%s", Starting.String()): func(_ 
context.Context, event *fsm.Event) {
                                app := event.Args[0].(*Application) 
//nolint:errcheck
                                app.startTime = time.Now()
-                               app.queue.incRunningApps(app.ApplicationID)
                                app.setStateTimer(app.startTimeout, 
app.stateMachine.Current(), RunApplication)
+                               app.queue.incRunningApps(app.ApplicationID)
                                
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
                                
metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
                        },
-                       fmt.Sprintf("enter_%s", Resuming.String()): func(_ 
context.Context, event *fsm.Event) {
-                               app := event.Args[0].(*Application) 
//nolint:errcheck
-                               
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
-                               
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
-                       },
-                       fmt.Sprintf("enter_%s", Completing.String()): func(_ 
context.Context, event *fsm.Event) {
-                               app := event.Args[0].(*Application) 
//nolint:errcheck
-                               if event.Src == Starting.String() {
+                       fmt.Sprintf("leave_%s", Starting.String()): func(_ 
context.Context, event *fsm.Event) {
+                               if event.Dst != Running.String() {
+                                       app := event.Args[0].(*Application) 
//nolint:errcheck
                                        app.queue.decRunningApps()
                                        
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
                                        
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
                                }
+                       },
+                       fmt.Sprintf("enter_%s", Completing.String()): func(_ 
context.Context, event *fsm.Event) {
+                               app := event.Args[0].(*Application) 
//nolint:errcheck
                                app.setStateTimer(completingTimeout, 
app.stateMachine.Current(), CompleteApplication)
                        },
                        fmt.Sprintf("leave_%s", New.String()): func(_ 
context.Context, event *fsm.Event) {
-                               app := event.Args[0].(*Application) 
//nolint:errcheck
-                               
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted()
-                               
metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted()
+                               if event.Dst != Rejected.String() {
+                                       app := event.Args[0].(*Application) 
//nolint:errcheck
+                                       
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted()
+                                       
metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted()
+                               }
                        },
                        fmt.Sprintf("enter_%s", Rejected.String()): func(_ 
context.Context, event *fsm.Event) {
                                app := event.Args[0].(*Application) 
//nolint:errcheck
@@ -205,19 +205,21 @@ func NewAppState() *fsm.FSM {
                                        app.rejectedMessage = 
event.Args[1].(string) //nolint:errcheck
                                }
                        },
+                       fmt.Sprintf("enter_%s", Running.String()): func(_ 
context.Context, event *fsm.Event) {
+                               app := event.Args[0].(*Application) 
//nolint:errcheck
+                               // account for going back into running state
+                               if event.Src == Completing.String() {
+                                       
app.queue.incRunningApps(app.ApplicationID)
+                                       
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
+                                       
metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
+                               }
+                       },
                        fmt.Sprintf("leave_%s", Running.String()): func(_ 
context.Context, event *fsm.Event) {
                                app := event.Args[0].(*Application) 
//nolint:errcheck
                                app.queue.decRunningApps()
                                
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
                                
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
                        },
-                       fmt.Sprintf("leave_%s", Completing.String()): func(_ 
context.Context, event *fsm.Event) {
-                               app := event.Args[0].(*Application) 
//nolint:errcheck
-                               // account for going back into running state
-                               if event.Dst == Running.String() {
-                                       
app.queue.incRunningApps(app.ApplicationID)
-                               }
-                       },
                        fmt.Sprintf("enter_%s", Completed.String()): func(_ 
context.Context, event *fsm.Event) {
                                app := event.Args[0].(*Application) 
//nolint:errcheck
                                
metrics.GetSchedulerMetrics().IncTotalApplicationsCompleted()
diff --git a/pkg/scheduler/objects/application_state_test.go 
b/pkg/scheduler/objects/application_state_test.go
index 2f1bf564..7a07f808 100644
--- a/pkg/scheduler/objects/application_state_test.go
+++ b/pkg/scheduler/objects/application_state_test.go
@@ -27,6 +27,7 @@ import (
 
        "github.com/apache/yunikorn-core/pkg/common"
        "github.com/apache/yunikorn-core/pkg/events"
+       "github.com/apache/yunikorn-core/pkg/metrics"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
@@ -303,3 +304,244 @@ func TestAppStateTransitionEvents(t *testing.T) {
        isStateChangeEvent(t, appInfo, si.EventRecord_APP_EXPIRED, records[6])
        isStateChangeEvent(t, appInfo, si.EventRecord_APP_RESUMING, records[7])
 }
+
+// Test to verify metrics after applications state transition
+// app-00001: New -> Resuming -> Accepted -> Starting -> Running -> 
Completing-> Completed
+// app-00002: New -> Accepted -> Starting -> Completing -> Running -> 
Failing-> Failed
+// app-00003: New -> Accepted -> Starting -> Failing -> Failed
+// app-00004: New -> Rejected
+// Final metrics will be: 0 running, 3 accepted, 1 completed, 2 failed and 1 
rejected applications
+func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen
+       queue := createQueue(t, "root.metrics")
+       metrics.GetSchedulerMetrics().Reset()
+       // app-00001: New -> Resuming -> Accepted --> Starting -> Running -> 
Completing-> Completed
+       app := newApplication("app-00001", "default", "root.metrics")
+       app.SetQueue(queue)
+       assertState(t, app, nil, New.String())
+       assertTotalAppsRunningMetrics(t, 0)
+       assertTotalAppsCompletedMetrics(t, 0)
+       assertTotalAppsRejectedMetrics(t, 0)
+       // New -> Resuming
+       err := app.HandleApplicationEvent(ResumeApplication)
+       assertState(t, app, err, Resuming.String())
+       assertTotalAppsRunningMetrics(t, 0)
+       assertTotalAppsCompletedMetrics(t, 0)
+       assertTotalAppsRejectedMetrics(t, 0)
+       assertQueueRunningApps(t, app, 0)
+       assertQueueApplicationsRunningMetrics(t, app, 0)
+       assertQueueApplicationsAcceptedMetrics(t, app, 1)
+       assertQueueApplicationsRejectedMetrics(t, app, 0)
+       assertQueueApplicationsFailedMetrics(t, app, 0)
+       assertQueueApplicationsCompletedMetrics(t, app, 0)
+       // Resuming -> Accepted
+       err = app.HandleApplicationEvent(RunApplication)
+       assertState(t, app, err, Accepted.String())
+       assertTotalAppsRunningMetrics(t, 0)
+       assertTotalAppsCompletedMetrics(t, 0)
+       assertTotalAppsRejectedMetrics(t, 0)
+       assertQueueRunningApps(t, app, 0)
+       assertQueueApplicationsRunningMetrics(t, app, 0)
+       assertQueueApplicationsAcceptedMetrics(t, app, 1)
+       assertQueueApplicationsRejectedMetrics(t, app, 0)
+       assertQueueApplicationsFailedMetrics(t, app, 0)
+       assertQueueApplicationsCompletedMetrics(t, app, 0)
+       // Accepted -> Starting
+       err = app.HandleApplicationEvent(RunApplication)
+       assertState(t, app, err, Starting.String())
+       assertTotalAppsRunningMetrics(t, 1)
+       assertTotalAppsCompletedMetrics(t, 0)
+       assertTotalAppsRejectedMetrics(t, 0)
+       assertQueueRunningApps(t, app, 1)
+       assertQueueApplicationsRunningMetrics(t, app, 1)
+       assertQueueApplicationsAcceptedMetrics(t, app, 1)
+       assertQueueApplicationsRejectedMetrics(t, app, 0)
+       assertQueueApplicationsFailedMetrics(t, app, 0)
+       assertQueueApplicationsCompletedMetrics(t, app, 0)
+       // Starting -> Running
+       err = app.HandleApplicationEvent(RunApplication)
+       assertState(t, app, err, Running.String())
+       assertTotalAppsRunningMetrics(t, 1)
+       assertTotalAppsCompletedMetrics(t, 0)
+       assertTotalAppsRejectedMetrics(t, 0)
+       assertQueueRunningApps(t, app, 1)
+       assertQueueApplicationsRunningMetrics(t, app, 1)
+       assertQueueApplicationsAcceptedMetrics(t, app, 1)
+       assertQueueApplicationsRejectedMetrics(t, app, 0)
+       assertQueueApplicationsFailedMetrics(t, app, 0)
+       assertQueueApplicationsCompletedMetrics(t, app, 0)
+       // Running -> Completing
+       err = app.HandleApplicationEvent(CompleteApplication)
+       assertState(t, app, err, Completing.String())
+       assertTotalAppsRunningMetrics(t, 0)
+       assertTotalAppsCompletedMetrics(t, 0)
+       assertTotalAppsRejectedMetrics(t, 0)
+       assertQueueRunningApps(t, app, 0)
+       assertQueueApplicationsRunningMetrics(t, app, 0)
+       assertQueueApplicationsAcceptedMetrics(t, app, 1)
+       assertQueueApplicationsRejectedMetrics(t, app, 0)
+       assertQueueApplicationsFailedMetrics(t, app, 0)
+       assertQueueApplicationsCompletedMetrics(t, app, 0)
+       // Completing -> Completed
+       err = app.HandleApplicationEvent(CompleteApplication)
+       assertState(t, app, err, Completed.String())
+       assertTotalAppsRunningMetrics(t, 0)
+       assertTotalAppsCompletedMetrics(t, 1)
+       assertTotalAppsRejectedMetrics(t, 0)
+       assertQueueRunningApps(t, app, 0)
+       assertQueueApplicationsRunningMetrics(t, app, 0)
+       assertQueueApplicationsAcceptedMetrics(t, app, 1)
+       assertQueueApplicationsRejectedMetrics(t, app, 0)
+       assertQueueApplicationsFailedMetrics(t, app, 0)
+       assertQueueApplicationsCompletedMetrics(t, app, 1)
+
+       // app-00002: New -> Accepted -> Starting -> Completing -> Running -> 
Failing-> Failed
+       app = newApplication("app-00002", "default", "root.metrics")
+       app.SetQueue(queue)
+       assertState(t, app, nil, New.String())
+       // New -> Accepted
+       err = app.HandleApplicationEvent(RunApplication)
+       assertState(t, app, err, Accepted.String())
+       // Accepted -> Starting
+       err = app.HandleApplicationEvent(RunApplication)
+       assertState(t, app, err, Starting.String())
+       // Starting -> Completing
+       err = app.HandleApplicationEvent(CompleteApplication)
+       assertState(t, app, err, Completing.String())
+       // Completing -> Running
+       err = app.HandleApplicationEvent(RunApplication)
+       assertState(t, app, err, Running.String())
+       // Running -> Failing
+       err = app.HandleApplicationEvent(FailApplication)
+       assertState(t, app, err, Failing.String())
+       // Failing -> Failed
+       err = app.HandleApplicationEvent(FailApplication)
+       assertState(t, app, err, Failed.String())
+       assertTotalAppsRunningMetrics(t, 0)
+       assertTotalAppsCompletedMetrics(t, 1)
+       assertTotalAppsRejectedMetrics(t, 0)
+       assertQueueRunningApps(t, app, 0)
+       assertQueueApplicationsRunningMetrics(t, app, 0)
+       assertQueueApplicationsAcceptedMetrics(t, app, 2)
+       assertQueueApplicationsRejectedMetrics(t, app, 0)
+       assertQueueApplicationsFailedMetrics(t, app, 1)
+       assertQueueApplicationsCompletedMetrics(t, app, 1)
+
+       // app-00003: New -> Accepted -> Starting -> Failing -> Failed
+       app = newApplication("app-00003", "default", "root.metrics")
+       app.SetQueue(queue)
+       assertState(t, app, nil, New.String())
+       // New -> Accepted
+       err = app.HandleApplicationEvent(RunApplication)
+       assertState(t, app, err, Accepted.String())
+       // Accepted -> Starting
+       err = app.HandleApplicationEvent(RunApplication)
+       assertState(t, app, err, Starting.String())
+       // Starting -> Failing
+       err = app.HandleApplicationEvent(FailApplication)
+       assertState(t, app, err, Failing.String())
+       // Failing -> Failed
+       err = app.HandleApplicationEvent(FailApplication)
+       assertState(t, app, err, Failed.String())
+       assertTotalAppsRunningMetrics(t, 0)
+       assertTotalAppsCompletedMetrics(t, 1)
+       assertTotalAppsRejectedMetrics(t, 0)
+       assertQueueRunningApps(t, app, 0)
+       assertQueueApplicationsRunningMetrics(t, app, 0)
+       assertQueueApplicationsAcceptedMetrics(t, app, 3)
+       assertQueueApplicationsRejectedMetrics(t, app, 0)
+       assertQueueApplicationsFailedMetrics(t, app, 2)
+       assertQueueApplicationsCompletedMetrics(t, app, 1)
+
+       // app-00004: New -> Rejected
+       app = newApplication("app-00004", "default", "root.metrics")
+       app.SetQueue(queue)
+       assertState(t, app, nil, New.String())
+       // New -> Rejected
+       err = app.HandleApplicationEvent(RejectApplication)
+       assertState(t, app, err, Rejected.String())
+       assertTotalAppsRunningMetrics(t, 0)
+       assertTotalAppsCompletedMetrics(t, 1)
+       assertTotalAppsRejectedMetrics(t, 1)
+       assertQueueRunningApps(t, app, 0)
+       assertQueueApplicationsRunningMetrics(t, app, 0)
+       assertQueueApplicationsAcceptedMetrics(t, app, 3)
+       assertQueueApplicationsRejectedMetrics(t, app, 1)
+       assertQueueApplicationsFailedMetrics(t, app, 2)
+       assertQueueApplicationsCompletedMetrics(t, app, 1)
+}
+
+func assertState(t testing.TB, app *Application, err error, expected string) {
+       t.Helper()
+       assert.NilError(t, err, fmt.Sprintf("no error expected when change 
state to %v", expected))
+       assert.Equal(t, app.CurrentState(), expected, "application not in 
expected state.")
+}
+
+func assertTotalAppsRunningMetrics(t testing.TB, expected int) {
+       t.Helper()
+       totalAppsRunning, err := 
metrics.GetSchedulerMetrics().GetTotalApplicationsRunning()
+       assert.NilError(t, err, "no error expected when getting total running 
application count.")
+       assert.Equal(t, totalAppsRunning, expected, "total running application 
metrics is not as expected.")
+}
+
+func assertTotalAppsCompletedMetrics(t testing.TB, expected int) {
+       t.Helper()
+       totalAppsCompleted, err := 
metrics.GetSchedulerMetrics().GetTotalApplicationsCompleted()
+       assert.NilError(t, err, "no error expected when getting total completed 
application count.")
+       assert.Equal(t, totalAppsCompleted, expected, "total completed 
application metrics is not as expected.")
+}
+
+func assertTotalAppsRejectedMetrics(t testing.TB, expected int) {
+       t.Helper()
+       totalAppsRejected, err := 
metrics.GetSchedulerMetrics().GetTotalApplicationsRejected()
+       assert.NilError(t, err, "no error expected when getting total rejected 
application count.")
+       assert.Equal(t, totalAppsRejected, expected, "total rejected 
application metrics is not as expected.")
+}
+
+func assertQueueRunningApps(t testing.TB, app *Application, expected int) {
+       t.Helper()
+       runningApps := app.queue.runningApps
+       assert.Equal(t, runningApps, uint64(expected), "total running 
application in queue is not as expected.")
+}
+
+func assertQueueApplicationsAcceptedMetrics(t testing.TB, app *Application, 
expected int) {
+       t.Helper()
+       queueApplicationsAccepted, err := 
metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsAccepted()
+       assert.NilError(t, err, "no error expected when getting total accepted 
application count in queue.")
+       assert.Equal(t, queueApplicationsAccepted, expected, "total accepted 
application metrics in queue is not as expected.")
+}
+
+func assertQueueApplicationsRejectedMetrics(t testing.TB, app *Application, 
expected int) {
+       t.Helper()
+       queueApplicationsRejected, err := 
metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsRejected()
+       assert.NilError(t, err, "no error expected when getting total rejected 
application count in queue.")
+       assert.Equal(t, queueApplicationsRejected, expected, "total rejected 
application metrics in queue is not as expected.")
+}
+
+func assertQueueApplicationsRunningMetrics(t testing.TB, app *Application, 
expected int) {
+       t.Helper()
+       queueApplicationsRunning, err := 
metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsRunning()
+       assert.NilError(t, err, "no error expected when getting total running 
application count in queue.")
+       assert.Equal(t, queueApplicationsRunning, expected, "total running 
application metrics in queue is not as expected.")
+}
+
+func assertQueueApplicationsFailedMetrics(t testing.TB, app *Application, 
expected int) {
+       t.Helper()
+       queueApplicationsFailed, err := 
metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsFailed()
+       assert.NilError(t, err, "no error expected when getting total failed 
application count in queue.")
+       assert.Equal(t, queueApplicationsFailed, expected, "total failed 
application metrics in queue is not as expected.")
+}
+
+func assertQueueApplicationsCompletedMetrics(t testing.TB, app *Application, 
expected int) {
+       t.Helper()
+       queueApplicationsCompleted, err := 
metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsCompleted()
+       assert.NilError(t, err, "no error expected when getting total completed 
application count in queue.")
+       assert.Equal(t, queueApplicationsCompleted, expected, "total completed 
application metrics in queue is not as expected.")
+}
+
+func createQueue(t *testing.T, queueName string) *Queue {
+       root, err := createRootQueue(nil)
+       assert.NilError(t, err, "failed to create queue: %v", err)
+       queue, err := createManagedQueue(root, queueName, false, 
map[string]string{"cpu": "10"})
+       assert.NilError(t, err, "failed to create queue: %v", err)
+       return queue
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org
For additional commands, e-mail: issues-h...@yunikorn.apache.org

Reply via email to