This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 7efcb36f3e refactor(plc4go/spi): generify WorkerPool
7efcb36f3e is described below
commit 7efcb36f3ea0d61fb0e44f52902e23e6b843122c
Author: Sebastian Rühl <[email protected]>
AuthorDate: Tue Mar 21 11:42:50 2023 +0100
refactor(plc4go/spi): generify WorkerPool
---
plc4go/spi/RequestTransactionManager.go | 2 +-
plc4go/spi/utils/WorkerPool.go | 45 ++++++++++++++++++++-------------
2 files changed, 29 insertions(+), 18 deletions(-)
diff --git a/plc4go/spi/RequestTransactionManager.go
b/plc4go/spi/RequestTransactionManager.go
index 10c72f5ac7..b914f8b969 100644
--- a/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/spi/RequestTransactionManager.go
@@ -37,7 +37,7 @@ import (
var sharedExecutorInstance utils.Executor // shared instance
func init() {
- sharedExecutorInstance = *utils.NewFixedSizeExecutor(runtime.NumCPU())
+ sharedExecutorInstance = *utils.NewFixedSizeExecutor(runtime.NumCPU(),
utils.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers))
sharedExecutorInstance.Start()
}
diff --git a/plc4go/spi/utils/WorkerPool.go b/plc4go/spi/utils/WorkerPool.go
index e24fdcc8f7..38d74d00a2 100644
--- a/plc4go/spi/utils/WorkerPool.go
+++ b/plc4go/spi/utils/WorkerPool.go
@@ -21,7 +21,6 @@ package utils
import (
"fmt"
- "github.com/apache/plc4x/plc4go/pkg/api/config"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"sync"
@@ -49,7 +48,7 @@ func (w *Worker) work() {
}
}()
workerLog := log.With().Int("Worker id", w.id).Logger()
- if !config.TraceTransactionManagerWorkers {
+ if !w.executor.traceWorkers {
workerLog = zerolog.Nop()
}
@@ -75,24 +74,25 @@ func (w *Worker) work() {
}
type WorkItem struct {
- transactionId int32
+ workItemId int32
runnable Runnable
completionFuture *CompletionFuture
}
func (w *WorkItem) String() string {
- return fmt.Sprintf("Workitem{tid:%d}", w.transactionId)
+ return fmt.Sprintf("Workitem{wid:%d}", w.workItemId)
}
type Executor struct {
- running bool
- shutdown bool
- stateChange sync.Mutex
- worker []*Worker
- queue chan WorkItem
+ running bool
+ shutdown bool
+ stateChange sync.Mutex
+ worker []*Worker
+ queue chan WorkItem
+ traceWorkers bool
}
-func NewFixedSizeExecutor(numberOfWorkers int) *Executor {
+func NewFixedSizeExecutor(numberOfWorkers int, options ...ExecutorOption)
*Executor {
workers := make([]*Worker, numberOfWorkers)
for i := 0; i < numberOfWorkers; i++ {
workers[i] = &Worker{
@@ -103,27 +103,38 @@ func NewFixedSizeExecutor(numberOfWorkers int) *Executor {
executor: nil,
}
}
- executor := Executor{
+ executor := &Executor{
queue: make(chan WorkItem, 100),
worker: workers,
}
+ for _, option := range options {
+ option(executor)
+ }
for i := 0; i < numberOfWorkers; i++ {
worker := workers[i]
- worker.executor = &executor
+ worker.executor = executor
+ }
+ return executor
+}
+
+type ExecutorOption func(*Executor)
+
+func WithExecutorOptionTracerWorkers(traceWorkers bool) ExecutorOption {
+ return func(executor *Executor) {
+ executor.traceWorkers = traceWorkers
}
- return &executor
}
-func (e *Executor) Submit(transactionId int32, runnable Runnable)
*CompletionFuture {
- log.Trace().Int32("transactionId", transactionId).Msg("Submitting
runnable")
+func (e *Executor) Submit(workItemId int32, runnable Runnable)
*CompletionFuture {
+ log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
completionFuture := &CompletionFuture{}
// TODO: add select and timeout if queue is full
e.queue <- WorkItem{
- transactionId: transactionId,
+ workItemId: workItemId,
runnable: runnable,
completionFuture: completionFuture,
}
- log.Trace().Int32("transactionId", transactionId).Msg("runnable queued")
+ log.Trace().Int32("workItemId", workItemId).Msg("runnable queued")
return completionFuture
}