This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 23f19dbd4c refactor(plc4go/spi): move worker related code into
WorkerPool
23f19dbd4c is described below
commit 23f19dbd4c4b4625034e825320c0d24966239e64
Author: Sebastian Rühl <[email protected]>
AuthorDate: Tue Mar 21 11:37:23 2023 +0100
refactor(plc4go/spi): move worker related code into WorkerPool
---
plc4go/spi/RequestTransactionManager.go | 184 +++-----------------------------
plc4go/spi/utils/WorkerPool.go | 184 ++++++++++++++++++++++++++++++++
2 files changed, 200 insertions(+), 168 deletions(-)
diff --git a/plc4go/spi/RequestTransactionManager.go
b/plc4go/spi/RequestTransactionManager.go
index a6b0255021..10c72f5ac7 100644
--- a/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/spi/RequestTransactionManager.go
@@ -22,175 +22,23 @@ package spi
import (
"container/list"
"fmt"
+ "runtime"
+ "sync"
+ "time"
+
"github.com/apache/plc4x/plc4go/pkg/api/config"
+ "github.com/apache/plc4x/plc4go/spi/utils"
+
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
- "runtime"
- "sync"
- "time"
)
-var sharedExecutorInstance Executor // shared instance
+var sharedExecutorInstance utils.Executor // shared instance
func init() {
- sharedExecutorInstance = *NewFixedSizeExecutor(runtime.NumCPU())
- sharedExecutorInstance.start()
-}
-
-type Runnable func()
-
-type Worker struct {
- id int
- shutdown bool
- runnable Runnable
- interrupted bool
- executor *Executor
-}
-
-func (w *Worker) work() {
- defer func() {
- if recovered := recover(); recovered != nil {
- log.Error().Msgf("Recovering from panic()=%v",
recovered)
- }
- if !w.shutdown {
- // TODO: if we are not in shutdown we continue
- w.work()
- }
- }()
- workerLog := log.With().Int("Worker id", w.id).Logger()
- if !config.TraceTransactionManagerWorkers {
- workerLog = zerolog.Nop()
- }
-
- for !w.shutdown {
- workerLog.Debug().Msg("Working")
- select {
- case workItem := <-w.executor.queue:
- workerLog.Debug().Msgf("Got work item %v", workItem)
- if workItem.completionFuture.cancelRequested ||
(w.shutdown && w.interrupted) {
- workerLog.Debug().Msg("We need to stop")
- // TODO: do we need to complete with a error?
- } else {
- workerLog.Debug().Msgf("Running work item %v",
workItem)
- workItem.runnable()
- workItem.completionFuture.complete()
- workerLog.Debug().Msgf("work item %v
completed", workItem)
- }
- default:
- workerLog.Debug().Msgf("Idling")
- time.Sleep(time.Millisecond * 10)
- }
- }
-}
-
-type WorkItem struct {
- transactionId int32
- runnable Runnable
- completionFuture *CompletionFuture
-}
-
-func (w *WorkItem) String() string {
- return fmt.Sprintf("Workitem{tid:%d}", w.transactionId)
-}
-
-type Executor struct {
- running bool
- shutdown bool
- stateChange sync.Mutex
- worker []*Worker
- queue chan WorkItem
-}
-
-func NewFixedSizeExecutor(numberOfWorkers int) *Executor {
- workers := make([]*Worker, numberOfWorkers)
- for i := 0; i < numberOfWorkers; i++ {
- workers[i] = &Worker{
- id: i,
- shutdown: false,
- runnable: nil,
- interrupted: false,
- executor: nil,
- }
- }
- executor := Executor{
- queue: make(chan WorkItem, 100),
- worker: workers,
- }
- for i := 0; i < numberOfWorkers; i++ {
- worker := workers[i]
- worker.executor = &executor
- }
- return &executor
-}
-
-func (e *Executor) submit(transactionId int32, runnable Runnable)
*CompletionFuture {
- log.Trace().Int32("transactionId", transactionId).Msg("Submitting
runnable")
- completionFuture := &CompletionFuture{}
- // TODO: add select and timeout if queue is full
- e.queue <- WorkItem{
- transactionId: transactionId,
- runnable: runnable,
- completionFuture: completionFuture,
- }
- log.Trace().Int32("transactionId", transactionId).Msg("runnable queued")
- return completionFuture
-}
-
-func (e *Executor) start() {
- e.stateChange.Lock()
- defer e.stateChange.Unlock()
- if e.running {
- return
- }
- e.running = true
- e.shutdown = false
- for i := 0; i < len(e.worker); i++ {
- worker := e.worker[i]
- go worker.work()
- }
-}
-
-func (e *Executor) stop() {
- e.stateChange.Lock()
- defer e.stateChange.Unlock()
- if !e.running {
- return
- }
- e.shutdown = true
- close(e.queue)
- for i := 0; i < len(e.worker); i++ {
- worker := e.worker[i]
- worker.shutdown = true
- worker.interrupted = true
- }
- e.running = false
-}
-
-type CompletionFuture struct {
- cancelRequested bool
- interruptRequested bool
- completed bool
- errored bool
- err error
-}
-
-func (f *CompletionFuture) cancel(interrupt bool, err error) {
- f.cancelRequested = true
- f.interruptRequested = interrupt
- f.errored = true
- f.err = err
-}
-
-func (f *CompletionFuture) complete() {
- f.completed = true
-}
-
-func (f *CompletionFuture) AwaitCompletion() error {
- for !f.completed && !f.errored {
- time.Sleep(time.Millisecond * 10)
- }
- return f.err
+ sharedExecutorInstance = *utils.NewFixedSizeExecutor(runtime.NumCPU())
+ sharedExecutorInstance.Start()
}
type RequestTransaction struct {
@@ -198,8 +46,8 @@ type RequestTransaction struct {
transactionId int32
/** The initial operation to perform to kick off the request */
- operation Runnable
- completionFuture *CompletionFuture
+ operation utils.Runnable
+ completionFuture *utils.CompletionFuture
transactionLog zerolog.Logger
}
@@ -219,7 +67,7 @@ type RequestTransactionManager struct {
// Important, this is a FIFO Queue for Fairness!
workLog list.List
workLogMutex sync.RWMutex
- executor *Executor
+ executor *utils.Executor
}
// NewRequestTransactionManager creates a new RequestTransactionManager
@@ -239,7 +87,7 @@ func NewRequestTransactionManager(numberOfConcurrentRequests
int, requestTransac
type RequestTransactionManagerOption func(requestTransactionManager
*RequestTransactionManager)
// WithCustomExecutor sets a custom Executor for the RequestTransactionManager
-func WithCustomExecutor(executor *Executor) RequestTransactionManagerOption {
+func WithCustomExecutor(executor *utils.Executor)
RequestTransactionManagerOption {
return func(requestTransactionManager *RequestTransactionManager) {
requestTransactionManager.executor = executor
}
@@ -286,7 +134,7 @@ func (r *RequestTransactionManager) processWorklog() {
log.Debug().Msgf("Handling next %v. (Adding to running requests
(length: %d))", next, len(r.runningRequests))
r.runningRequests = append(r.runningRequests, next)
// TODO: use sharedInstance if none is present
- completionFuture :=
sharedExecutorInstance.submit(next.transactionId, next.operation)
+ completionFuture :=
sharedExecutorInstance.Submit(next.transactionId, next.operation)
next.completionFuture = completionFuture
r.workLog.Remove(front)
}
@@ -317,7 +165,7 @@ func (r *RequestTransactionManager)
getNumberOfActiveRequests() int {
func (r *RequestTransactionManager) failRequest(transaction
*RequestTransaction, err error) error {
// Try to fail it!
- transaction.completionFuture.cancel(true, err)
+ transaction.completionFuture.Cancel(true, err)
// End it
return r.endRequest(transaction)
}
@@ -359,7 +207,7 @@ func (t *RequestTransaction) EndRequest() error {
}
// Submit submits a Runnable to the RequestTransactionManager
-func (t *RequestTransaction) Submit(operation Runnable) {
+func (t *RequestTransaction) Submit(operation utils.Runnable) {
if t.operation != nil {
panic("Operation already set")
}
diff --git a/plc4go/spi/utils/WorkerPool.go b/plc4go/spi/utils/WorkerPool.go
new file mode 100644
index 0000000000..e24fdcc8f7
--- /dev/null
+++ b/plc4go/spi/utils/WorkerPool.go
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package utils
+
+import (
+ "fmt"
+ "github.com/apache/plc4x/plc4go/pkg/api/config"
+ "github.com/rs/zerolog"
+ "github.com/rs/zerolog/log"
+ "sync"
+ "time"
+)
+
+type Runnable func()
+
+type Worker struct {
+ id int
+ shutdown bool
+ runnable Runnable
+ interrupted bool
+ executor *Executor
+}
+
+func (w *Worker) work() {
+ defer func() {
+ if recovered := recover(); recovered != nil {
+ log.Error().Msgf("Recovering from panic()=%v",
recovered)
+ }
+ if !w.shutdown {
+ // if we are not in shutdown we continue
+ w.work()
+ }
+ }()
+ workerLog := log.With().Int("Worker id", w.id).Logger()
+ if !config.TraceTransactionManagerWorkers {
+ workerLog = zerolog.Nop()
+ }
+
+ for !w.shutdown {
+ workerLog.Debug().Msg("Working")
+ select {
+ case workItem := <-w.executor.queue:
+ workerLog.Debug().Msgf("Got work item %v", workItem)
+ if workItem.completionFuture.cancelRequested ||
(w.shutdown && w.interrupted) {
+ workerLog.Debug().Msg("We need to stop")
+ // TODO: do we need to complete with a error?
+ } else {
+ workerLog.Debug().Msgf("Running work item %v",
workItem)
+ workItem.runnable()
+ workItem.completionFuture.Complete()
+ workerLog.Debug().Msgf("work item %v
completed", workItem)
+ }
+ default:
+ workerLog.Debug().Msgf("Idling")
+ time.Sleep(time.Millisecond * 10)
+ }
+ }
+}
+
+type WorkItem struct {
+ transactionId int32
+ runnable Runnable
+ completionFuture *CompletionFuture
+}
+
+func (w *WorkItem) String() string {
+ return fmt.Sprintf("Workitem{tid:%d}", w.transactionId)
+}
+
+type Executor struct {
+ running bool
+ shutdown bool
+ stateChange sync.Mutex
+ worker []*Worker
+ queue chan WorkItem
+}
+
+func NewFixedSizeExecutor(numberOfWorkers int) *Executor {
+ workers := make([]*Worker, numberOfWorkers)
+ for i := 0; i < numberOfWorkers; i++ {
+ workers[i] = &Worker{
+ id: i,
+ shutdown: false,
+ runnable: nil,
+ interrupted: false,
+ executor: nil,
+ }
+ }
+ executor := Executor{
+ queue: make(chan WorkItem, 100),
+ worker: workers,
+ }
+ for i := 0; i < numberOfWorkers; i++ {
+ worker := workers[i]
+ worker.executor = &executor
+ }
+ return &executor
+}
+
+func (e *Executor) Submit(transactionId int32, runnable Runnable)
*CompletionFuture {
+ log.Trace().Int32("transactionId", transactionId).Msg("Submitting
runnable")
+ completionFuture := &CompletionFuture{}
+ // TODO: add select and timeout if queue is full
+ e.queue <- WorkItem{
+ transactionId: transactionId,
+ runnable: runnable,
+ completionFuture: completionFuture,
+ }
+ log.Trace().Int32("transactionId", transactionId).Msg("runnable queued")
+ return completionFuture
+}
+
+func (e *Executor) Start() {
+ e.stateChange.Lock()
+ defer e.stateChange.Unlock()
+ if e.running {
+ return
+ }
+ e.running = true
+ e.shutdown = false
+ for i := 0; i < len(e.worker); i++ {
+ worker := e.worker[i]
+ go worker.work()
+ }
+}
+
+func (e *Executor) Stop() {
+ e.stateChange.Lock()
+ defer e.stateChange.Unlock()
+ if !e.running {
+ return
+ }
+ e.shutdown = true
+ close(e.queue)
+ for i := 0; i < len(e.worker); i++ {
+ worker := e.worker[i]
+ worker.shutdown = true
+ worker.interrupted = true
+ }
+ e.running = false
+}
+
+type CompletionFuture struct {
+ cancelRequested bool
+ interruptRequested bool
+ completed bool
+ errored bool
+ err error
+}
+
+func (f *CompletionFuture) Cancel(interrupt bool, err error) {
+ f.cancelRequested = true
+ f.interruptRequested = interrupt
+ f.errored = true
+ f.err = err
+}
+
+func (f *CompletionFuture) Complete() {
+ f.completed = true
+}
+
+func (f *CompletionFuture) AwaitCompletion() error {
+ for !f.completed && !f.errored {
+ time.Sleep(time.Millisecond * 10)
+ }
+ return f.err
+}