This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 12596b0828 refactor(plc4go/spi): clean up interfaces of WorkerPool
12596b0828 is described below

commit 12596b0828dfe18685eebf2602ac8925614dae7e
Author: Sebastian Rühl <[email protected]>
AuthorDate: Tue Mar 21 11:51:08 2023 +0100

    refactor(plc4go/spi): clean up interfaces of WorkerPool
---
 plc4go/spi/RequestTransactionManager.go |  7 ++++---
 plc4go/spi/utils/WorkerPool.go          | 27 ++++++++++++++++++---------
 2 files changed, 22 insertions(+), 12 deletions(-)

diff --git a/plc4go/spi/RequestTransactionManager.go 
b/plc4go/spi/RequestTransactionManager.go
index b914f8b969..a6b5268559 100644
--- a/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/spi/RequestTransactionManager.go
@@ -21,6 +21,7 @@ package spi
 
 import (
        "container/list"
+       "context"
        "fmt"
        "runtime"
        "sync"
@@ -47,7 +48,7 @@ type RequestTransaction struct {
 
        /** The initial operation to perform to kick off the request */
        operation        utils.Runnable
-       completionFuture *utils.CompletionFuture
+       completionFuture utils.CompletionFuture
 
        transactionLog zerolog.Logger
 }
@@ -221,11 +222,11 @@ func (t *RequestTransaction) Submit(operation 
utils.Runnable) {
 }
 
 // AwaitCompletion wait for this RequestTransaction to finish. Returns an 
error if it finished unsuccessful
-func (t *RequestTransaction) AwaitCompletion() error {
+func (t *RequestTransaction) AwaitCompletion(ctx context.Context) error {
        for t.completionFuture == nil {
                time.Sleep(time.Millisecond * 10)
        }
-       if err := t.completionFuture.AwaitCompletion(); err != nil {
+       if err := t.completionFuture.AwaitCompletion(ctx); err != nil {
                return err
        }
        stillActive := true
diff --git a/plc4go/spi/utils/WorkerPool.go b/plc4go/spi/utils/WorkerPool.go
index 38d74d00a2..9dbb32b885 100644
--- a/plc4go/spi/utils/WorkerPool.go
+++ b/plc4go/spi/utils/WorkerPool.go
@@ -20,6 +20,7 @@
 package utils
 
 import (
+       "context"
        "fmt"
        "github.com/rs/zerolog"
        "github.com/rs/zerolog/log"
@@ -63,7 +64,7 @@ func (w *Worker) work() {
                        } else {
                                workerLog.Debug().Msgf("Running work item %v", 
workItem)
                                workItem.runnable()
-                               workItem.completionFuture.Complete()
+                               workItem.completionFuture.complete()
                                workerLog.Debug().Msgf("work item %v 
completed", workItem)
                        }
                default:
@@ -76,7 +77,7 @@ func (w *Worker) work() {
 type WorkItem struct {
        workItemId       int32
        runnable         Runnable
-       completionFuture *CompletionFuture
+       completionFuture *future
 }
 
 func (w *WorkItem) String() string {
@@ -125,9 +126,9 @@ func WithExecutorOptionTracerWorkers(traceWorkers bool) 
ExecutorOption {
        }
 }
 
-func (e *Executor) Submit(workItemId int32, runnable Runnable) 
*CompletionFuture {
+func (e *Executor) Submit(workItemId int32, runnable Runnable) 
CompletionFuture {
        log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
-       completionFuture := &CompletionFuture{}
+       completionFuture := &future{}
        // TODO: add select and timeout if queue is full
        e.queue <- WorkItem{
                workItemId:       workItemId,
@@ -168,7 +169,12 @@ func (e *Executor) Stop() {
        e.running = false
 }
 
-type CompletionFuture struct {
+type CompletionFuture interface {
+       AwaitCompletion(ctx context.Context) error
+       Cancel(interrupt bool, err error)
+}
+
+type future struct {
        cancelRequested    bool
        interruptRequested bool
        completed          bool
@@ -176,20 +182,23 @@ type CompletionFuture struct {
        err                error
 }
 
-func (f *CompletionFuture) Cancel(interrupt bool, err error) {
+func (f *future) Cancel(interrupt bool, err error) {
        f.cancelRequested = true
        f.interruptRequested = interrupt
        f.errored = true
        f.err = err
 }
 
-func (f *CompletionFuture) Complete() {
+func (f *future) complete() {
        f.completed = true
 }
 
-func (f *CompletionFuture) AwaitCompletion() error {
-       for !f.completed && !f.errored {
+func (f *future) AwaitCompletion(ctx context.Context) error {
+       for !f.completed && !f.errored && ctx.Err() != nil {
                time.Sleep(time.Millisecond * 10)
        }
+       if err := ctx.Err(); err != nil {
+               return err
+       }
        return f.err
 }

Reply via email to