This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 12596b0828 refactor(plc4go/spi): clean up interfaces of WorkerPool
12596b0828 is described below
commit 12596b0828dfe18685eebf2602ac8925614dae7e
Author: Sebastian Rühl <[email protected]>
AuthorDate: Tue Mar 21 11:51:08 2023 +0100
refactor(plc4go/spi): clean up interfaces of WorkerPool
---
plc4go/spi/RequestTransactionManager.go | 7 ++++---
plc4go/spi/utils/WorkerPool.go | 27 ++++++++++++++++++---------
2 files changed, 22 insertions(+), 12 deletions(-)
diff --git a/plc4go/spi/RequestTransactionManager.go
b/plc4go/spi/RequestTransactionManager.go
index b914f8b969..a6b5268559 100644
--- a/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/spi/RequestTransactionManager.go
@@ -21,6 +21,7 @@ package spi
import (
"container/list"
+ "context"
"fmt"
"runtime"
"sync"
@@ -47,7 +48,7 @@ type RequestTransaction struct {
/** The initial operation to perform to kick off the request */
operation utils.Runnable
- completionFuture *utils.CompletionFuture
+ completionFuture utils.CompletionFuture
transactionLog zerolog.Logger
}
@@ -221,11 +222,11 @@ func (t *RequestTransaction) Submit(operation
utils.Runnable) {
}
// AwaitCompletion wait for this RequestTransaction to finish. Returns an
error if it finished unsuccessful
-func (t *RequestTransaction) AwaitCompletion() error {
+func (t *RequestTransaction) AwaitCompletion(ctx context.Context) error {
for t.completionFuture == nil {
time.Sleep(time.Millisecond * 10)
}
- if err := t.completionFuture.AwaitCompletion(); err != nil {
+ if err := t.completionFuture.AwaitCompletion(ctx); err != nil {
return err
}
stillActive := true
diff --git a/plc4go/spi/utils/WorkerPool.go b/plc4go/spi/utils/WorkerPool.go
index 38d74d00a2..9dbb32b885 100644
--- a/plc4go/spi/utils/WorkerPool.go
+++ b/plc4go/spi/utils/WorkerPool.go
@@ -20,6 +20,7 @@
package utils
import (
+ "context"
"fmt"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
@@ -63,7 +64,7 @@ func (w *Worker) work() {
} else {
workerLog.Debug().Msgf("Running work item %v",
workItem)
workItem.runnable()
- workItem.completionFuture.Complete()
+ workItem.completionFuture.complete()
workerLog.Debug().Msgf("work item %v
completed", workItem)
}
default:
@@ -76,7 +77,7 @@ func (w *Worker) work() {
type WorkItem struct {
workItemId int32
runnable Runnable
- completionFuture *CompletionFuture
+ completionFuture *future
}
func (w *WorkItem) String() string {
@@ -125,9 +126,9 @@ func WithExecutorOptionTracerWorkers(traceWorkers bool)
ExecutorOption {
}
}
-func (e *Executor) Submit(workItemId int32, runnable Runnable)
*CompletionFuture {
+func (e *Executor) Submit(workItemId int32, runnable Runnable)
CompletionFuture {
log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
- completionFuture := &CompletionFuture{}
+ completionFuture := &future{}
// TODO: add select and timeout if queue is full
e.queue <- WorkItem{
workItemId: workItemId,
@@ -168,7 +169,12 @@ func (e *Executor) Stop() {
e.running = false
}
-type CompletionFuture struct {
+type CompletionFuture interface {
+ AwaitCompletion(ctx context.Context) error
+ Cancel(interrupt bool, err error)
+}
+
+type future struct {
cancelRequested bool
interruptRequested bool
completed bool
@@ -176,20 +182,23 @@ type CompletionFuture struct {
err error
}
-func (f *CompletionFuture) Cancel(interrupt bool, err error) {
+func (f *future) Cancel(interrupt bool, err error) {
f.cancelRequested = true
f.interruptRequested = interrupt
f.errored = true
f.err = err
}
-func (f *CompletionFuture) Complete() {
+func (f *future) complete() {
f.completed = true
}
-func (f *CompletionFuture) AwaitCompletion() error {
- for !f.completed && !f.errored {
+func (f *future) AwaitCompletion(ctx context.Context) error {
+ for !f.completed && !f.errored && ctx.Err() != nil {
time.Sleep(time.Millisecond * 10)
}
+ if err := ctx.Err(); err != nil {
+ return err
+ }
return f.err
}