This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 23f19dbd4c refactor(plc4go/spi): move worker related code into 
WorkerPool
23f19dbd4c is described below

commit 23f19dbd4c4b4625034e825320c0d24966239e64
Author: Sebastian Rühl <[email protected]>
AuthorDate: Tue Mar 21 11:37:23 2023 +0100

    refactor(plc4go/spi): move worker related code into WorkerPool
---
 plc4go/spi/RequestTransactionManager.go | 184 +++-----------------------------
 plc4go/spi/utils/WorkerPool.go          | 184 ++++++++++++++++++++++++++++++++
 2 files changed, 200 insertions(+), 168 deletions(-)

diff --git a/plc4go/spi/RequestTransactionManager.go 
b/plc4go/spi/RequestTransactionManager.go
index a6b0255021..10c72f5ac7 100644
--- a/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/spi/RequestTransactionManager.go
@@ -22,175 +22,23 @@ package spi
 import (
        "container/list"
        "fmt"
+       "runtime"
+       "sync"
+       "time"
+
        "github.com/apache/plc4x/plc4go/pkg/api/config"
+       "github.com/apache/plc4x/plc4go/spi/utils"
+
        "github.com/pkg/errors"
        "github.com/rs/zerolog"
        "github.com/rs/zerolog/log"
-       "runtime"
-       "sync"
-       "time"
 )
 
-var sharedExecutorInstance Executor // shared instance
+var sharedExecutorInstance utils.Executor // shared instance
 
 func init() {
-       sharedExecutorInstance = *NewFixedSizeExecutor(runtime.NumCPU())
-       sharedExecutorInstance.start()
-}
-
-type Runnable func()
-
-type Worker struct {
-       id          int
-       shutdown    bool
-       runnable    Runnable
-       interrupted bool
-       executor    *Executor
-}
-
-func (w *Worker) work() {
-       defer func() {
-               if recovered := recover(); recovered != nil {
-                       log.Error().Msgf("Recovering from panic()=%v", 
recovered)
-               }
-               if !w.shutdown {
-                       // TODO: if we are not in shutdown we continue
-                       w.work()
-               }
-       }()
-       workerLog := log.With().Int("Worker id", w.id).Logger()
-       if !config.TraceTransactionManagerWorkers {
-               workerLog = zerolog.Nop()
-       }
-
-       for !w.shutdown {
-               workerLog.Debug().Msg("Working")
-               select {
-               case workItem := <-w.executor.queue:
-                       workerLog.Debug().Msgf("Got work item %v", workItem)
-                       if workItem.completionFuture.cancelRequested || 
(w.shutdown && w.interrupted) {
-                               workerLog.Debug().Msg("We need to stop")
-                               // TODO: do we need to complete with a error?
-                       } else {
-                               workerLog.Debug().Msgf("Running work item %v", 
workItem)
-                               workItem.runnable()
-                               workItem.completionFuture.complete()
-                               workerLog.Debug().Msgf("work item %v 
completed", workItem)
-                       }
-               default:
-                       workerLog.Debug().Msgf("Idling")
-                       time.Sleep(time.Millisecond * 10)
-               }
-       }
-}
-
-type WorkItem struct {
-       transactionId    int32
-       runnable         Runnable
-       completionFuture *CompletionFuture
-}
-
-func (w *WorkItem) String() string {
-       return fmt.Sprintf("Workitem{tid:%d}", w.transactionId)
-}
-
-type Executor struct {
-       running     bool
-       shutdown    bool
-       stateChange sync.Mutex
-       worker      []*Worker
-       queue       chan WorkItem
-}
-
-func NewFixedSizeExecutor(numberOfWorkers int) *Executor {
-       workers := make([]*Worker, numberOfWorkers)
-       for i := 0; i < numberOfWorkers; i++ {
-               workers[i] = &Worker{
-                       id:          i,
-                       shutdown:    false,
-                       runnable:    nil,
-                       interrupted: false,
-                       executor:    nil,
-               }
-       }
-       executor := Executor{
-               queue:  make(chan WorkItem, 100),
-               worker: workers,
-       }
-       for i := 0; i < numberOfWorkers; i++ {
-               worker := workers[i]
-               worker.executor = &executor
-       }
-       return &executor
-}
-
-func (e *Executor) submit(transactionId int32, runnable Runnable) 
*CompletionFuture {
-       log.Trace().Int32("transactionId", transactionId).Msg("Submitting 
runnable")
-       completionFuture := &CompletionFuture{}
-       // TODO: add select and timeout if queue is full
-       e.queue <- WorkItem{
-               transactionId:    transactionId,
-               runnable:         runnable,
-               completionFuture: completionFuture,
-       }
-       log.Trace().Int32("transactionId", transactionId).Msg("runnable queued")
-       return completionFuture
-}
-
-func (e *Executor) start() {
-       e.stateChange.Lock()
-       defer e.stateChange.Unlock()
-       if e.running {
-               return
-       }
-       e.running = true
-       e.shutdown = false
-       for i := 0; i < len(e.worker); i++ {
-               worker := e.worker[i]
-               go worker.work()
-       }
-}
-
-func (e *Executor) stop() {
-       e.stateChange.Lock()
-       defer e.stateChange.Unlock()
-       if !e.running {
-               return
-       }
-       e.shutdown = true
-       close(e.queue)
-       for i := 0; i < len(e.worker); i++ {
-               worker := e.worker[i]
-               worker.shutdown = true
-               worker.interrupted = true
-       }
-       e.running = false
-}
-
-type CompletionFuture struct {
-       cancelRequested    bool
-       interruptRequested bool
-       completed          bool
-       errored            bool
-       err                error
-}
-
-func (f *CompletionFuture) cancel(interrupt bool, err error) {
-       f.cancelRequested = true
-       f.interruptRequested = interrupt
-       f.errored = true
-       f.err = err
-}
-
-func (f *CompletionFuture) complete() {
-       f.completed = true
-}
-
-func (f *CompletionFuture) AwaitCompletion() error {
-       for !f.completed && !f.errored {
-               time.Sleep(time.Millisecond * 10)
-       }
-       return f.err
+       sharedExecutorInstance = *utils.NewFixedSizeExecutor(runtime.NumCPU())
+       sharedExecutorInstance.Start()
 }
 
 type RequestTransaction struct {
@@ -198,8 +46,8 @@ type RequestTransaction struct {
        transactionId int32
 
        /** The initial operation to perform to kick off the request */
-       operation        Runnable
-       completionFuture *CompletionFuture
+       operation        utils.Runnable
+       completionFuture *utils.CompletionFuture
 
        transactionLog zerolog.Logger
 }
@@ -219,7 +67,7 @@ type RequestTransactionManager struct {
        // Important, this is a FIFO Queue for Fairness!
        workLog      list.List
        workLogMutex sync.RWMutex
-       executor     *Executor
+       executor     *utils.Executor
 }
 
 // NewRequestTransactionManager creates a new RequestTransactionManager
@@ -239,7 +87,7 @@ func NewRequestTransactionManager(numberOfConcurrentRequests 
int, requestTransac
 type RequestTransactionManagerOption func(requestTransactionManager 
*RequestTransactionManager)
 
 // WithCustomExecutor sets a custom Executor for the RequestTransactionManager
-func WithCustomExecutor(executor *Executor) RequestTransactionManagerOption {
+func WithCustomExecutor(executor *utils.Executor) 
RequestTransactionManagerOption {
        return func(requestTransactionManager *RequestTransactionManager) {
                requestTransactionManager.executor = executor
        }
@@ -286,7 +134,7 @@ func (r *RequestTransactionManager) processWorklog() {
                log.Debug().Msgf("Handling next %v. (Adding to running requests 
(length: %d))", next, len(r.runningRequests))
                r.runningRequests = append(r.runningRequests, next)
                // TODO: use sharedInstance if none is present
-               completionFuture := 
sharedExecutorInstance.submit(next.transactionId, next.operation)
+               completionFuture := 
sharedExecutorInstance.Submit(next.transactionId, next.operation)
                next.completionFuture = completionFuture
                r.workLog.Remove(front)
        }
@@ -317,7 +165,7 @@ func (r *RequestTransactionManager) 
getNumberOfActiveRequests() int {
 
 func (r *RequestTransactionManager) failRequest(transaction 
*RequestTransaction, err error) error {
        // Try to fail it!
-       transaction.completionFuture.cancel(true, err)
+       transaction.completionFuture.Cancel(true, err)
        // End it
        return r.endRequest(transaction)
 }
@@ -359,7 +207,7 @@ func (t *RequestTransaction) EndRequest() error {
 }
 
 // Submit submits a Runnable to the RequestTransactionManager
-func (t *RequestTransaction) Submit(operation Runnable) {
+func (t *RequestTransaction) Submit(operation utils.Runnable) {
        if t.operation != nil {
                panic("Operation already set")
        }
diff --git a/plc4go/spi/utils/WorkerPool.go b/plc4go/spi/utils/WorkerPool.go
new file mode 100644
index 0000000000..e24fdcc8f7
--- /dev/null
+++ b/plc4go/spi/utils/WorkerPool.go
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package utils
+
+import (
+       "fmt"
+       "github.com/apache/plc4x/plc4go/pkg/api/config"
+       "github.com/rs/zerolog"
+       "github.com/rs/zerolog/log"
+       "sync"
+       "time"
+)
+
+type Runnable func()
+
+type Worker struct {
+       id          int
+       shutdown    bool
+       runnable    Runnable
+       interrupted bool
+       executor    *Executor
+}
+
+func (w *Worker) work() {
+       defer func() {
+               if recovered := recover(); recovered != nil {
+                       log.Error().Msgf("Recovering from panic()=%v", 
recovered)
+               }
+               if !w.shutdown {
+                       // if we are not in shutdown we continue
+                       w.work()
+               }
+       }()
+       workerLog := log.With().Int("Worker id", w.id).Logger()
+       if !config.TraceTransactionManagerWorkers {
+               workerLog = zerolog.Nop()
+       }
+
+       for !w.shutdown {
+               workerLog.Debug().Msg("Working")
+               select {
+               case workItem := <-w.executor.queue:
+                       workerLog.Debug().Msgf("Got work item %v", workItem)
+                       if workItem.completionFuture.cancelRequested || 
(w.shutdown && w.interrupted) {
+                               workerLog.Debug().Msg("We need to stop")
+                               // TODO: do we need to complete with a error?
+                       } else {
+                               workerLog.Debug().Msgf("Running work item %v", 
workItem)
+                               workItem.runnable()
+                               workItem.completionFuture.Complete()
+                               workerLog.Debug().Msgf("work item %v 
completed", workItem)
+                       }
+               default:
+                       workerLog.Debug().Msgf("Idling")
+                       time.Sleep(time.Millisecond * 10)
+               }
+       }
+}
+
+type WorkItem struct {
+       transactionId    int32
+       runnable         Runnable
+       completionFuture *CompletionFuture
+}
+
+func (w *WorkItem) String() string {
+       return fmt.Sprintf("Workitem{tid:%d}", w.transactionId)
+}
+
+type Executor struct {
+       running     bool
+       shutdown    bool
+       stateChange sync.Mutex
+       worker      []*Worker
+       queue       chan WorkItem
+}
+
+func NewFixedSizeExecutor(numberOfWorkers int) *Executor {
+       workers := make([]*Worker, numberOfWorkers)
+       for i := 0; i < numberOfWorkers; i++ {
+               workers[i] = &Worker{
+                       id:          i,
+                       shutdown:    false,
+                       runnable:    nil,
+                       interrupted: false,
+                       executor:    nil,
+               }
+       }
+       executor := Executor{
+               queue:  make(chan WorkItem, 100),
+               worker: workers,
+       }
+       for i := 0; i < numberOfWorkers; i++ {
+               worker := workers[i]
+               worker.executor = &executor
+       }
+       return &executor
+}
+
+func (e *Executor) Submit(transactionId int32, runnable Runnable) 
*CompletionFuture {
+       log.Trace().Int32("transactionId", transactionId).Msg("Submitting 
runnable")
+       completionFuture := &CompletionFuture{}
+       // TODO: add select and timeout if queue is full
+       e.queue <- WorkItem{
+               transactionId:    transactionId,
+               runnable:         runnable,
+               completionFuture: completionFuture,
+       }
+       log.Trace().Int32("transactionId", transactionId).Msg("runnable queued")
+       return completionFuture
+}
+
+func (e *Executor) Start() {
+       e.stateChange.Lock()
+       defer e.stateChange.Unlock()
+       if e.running {
+               return
+       }
+       e.running = true
+       e.shutdown = false
+       for i := 0; i < len(e.worker); i++ {
+               worker := e.worker[i]
+               go worker.work()
+       }
+}
+
+func (e *Executor) Stop() {
+       e.stateChange.Lock()
+       defer e.stateChange.Unlock()
+       if !e.running {
+               return
+       }
+       e.shutdown = true
+       close(e.queue)
+       for i := 0; i < len(e.worker); i++ {
+               worker := e.worker[i]
+               worker.shutdown = true
+               worker.interrupted = true
+       }
+       e.running = false
+}
+
+type CompletionFuture struct {
+       cancelRequested    bool
+       interruptRequested bool
+       completed          bool
+       errored            bool
+       err                error
+}
+
+func (f *CompletionFuture) Cancel(interrupt bool, err error) {
+       f.cancelRequested = true
+       f.interruptRequested = interrupt
+       f.errored = true
+       f.err = err
+}
+
+func (f *CompletionFuture) Complete() {
+       f.completed = true
+}
+
+func (f *CompletionFuture) AwaitCompletion() error {
+       for !f.completed && !f.errored {
+               time.Sleep(time.Millisecond * 10)
+       }
+       return f.err
+}

Reply via email to