This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new ab70d1b11a feat(plc4go/spi): introduce interfaces for request 
transaction manager
ab70d1b11a is described below

commit ab70d1b11a264209515e4f1c9e7c6b03d68ccbd8
Author: Sebastian Rühl <[email protected]>
AuthorDate: Wed Apr 5 16:25:36 2023 +0200

    feat(plc4go/spi): introduce interfaces for request transaction manager
---
 plc4go/internal/bacnetip/Connection.go         |   4 +-
 plc4go/internal/bacnetip/Driver.go             |   4 +-
 plc4go/internal/bacnetip/Reader.go             |   4 +-
 plc4go/internal/cbus/CBusMessageMapper.go      |   2 +-
 plc4go/internal/cbus/CBusMessageMapper_test.go |   6 +-
 plc4go/internal/cbus/Connection.go             |   4 +-
 plc4go/internal/cbus/Driver.go                 |   4 +-
 plc4go/internal/cbus/Reader.go                 |   6 +-
 plc4go/internal/cbus/Writer.go                 |   4 +-
 plc4go/internal/eip/Connection.go              |   4 +-
 plc4go/internal/eip/EipDriver.go               |   4 +-
 plc4go/internal/eip/Reader.go                  |   4 +-
 plc4go/internal/eip/Writer.go                  |   4 +-
 plc4go/internal/s7/Connection.go               |   4 +-
 plc4go/internal/s7/Driver.go                   |   4 +-
 plc4go/internal/s7/Reader.go                   |   4 +-
 plc4go/internal/s7/Writer.go                   |   4 +-
 plc4go/spi/RequestTransactionManager.go        | 124 +++++++++++++++----------
 plc4go/spi/testutils/DriverTestRunner.go       |   2 +
 19 files changed, 112 insertions(+), 84 deletions(-)

diff --git a/plc4go/internal/bacnetip/Connection.go 
b/plc4go/internal/bacnetip/Connection.go
index 30113f1a14..7354d61fe3 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -39,13 +39,13 @@ type Connection struct {
        invokeIdGenerator InvokeIdGenerator
        messageCodec      spi.MessageCodec
        subscribers       []*Subscriber
-       tm                *spi.RequestTransactionManager
+       tm                spi.RequestTransactionManager
 
        connectionId string
        tracer       *spi.Tracer
 }
 
-func NewConnection(messageCodec spi.MessageCodec, tagHandler 
spi.PlcTagHandler, tm *spi.RequestTransactionManager, options 
map[string][]string) *Connection {
+func NewConnection(messageCodec spi.MessageCodec, tagHandler 
spi.PlcTagHandler, tm spi.RequestTransactionManager, options 
map[string][]string) *Connection {
        connection := &Connection{
                invokeIdGenerator: InvokeIdGenerator{currentInvokeId: 0},
                messageCodec:      messageCodec,
diff --git a/plc4go/internal/bacnetip/Driver.go 
b/plc4go/internal/bacnetip/Driver.go
index 2803a73c58..afa7fbe606 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -53,7 +53,7 @@ func NewDriver() plc4go.PlcDriver {
                applicationManager: ApplicationManager{
                        applications: 
map[string]*ApplicationLayerMessageCodec{},
                },
-               tm:                      
*spi.NewRequestTransactionManager(math.MaxInt),
+               tm:                      
spi.NewRequestTransactionManager(math.MaxInt),
                awaitSetupComplete:      true,
                awaitDisconnectComplete: true,
        }
@@ -103,7 +103,7 @@ func (m *Driver) GetConnectionWithContext(ctx 
context.Context, transportUrl url.
        log.Debug().Msgf("working with codec %#v", codec)
 
        // Create the new connection
-       connection := NewConnection(codec, m.GetPlcTagHandler(), &m.tm, options)
+       connection := NewConnection(codec, m.GetPlcTagHandler(), m.tm, options)
        log.Debug().Msg("created connection, connecting now")
        return connection.ConnectWithContext(ctx)
 }
diff --git a/plc4go/internal/bacnetip/Reader.go 
b/plc4go/internal/bacnetip/Reader.go
index f965017fd8..d67378150e 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -37,13 +37,13 @@ import (
 type Reader struct {
        invokeIdGenerator *InvokeIdGenerator
        messageCodec      spi.MessageCodec
-       tm                *spi.RequestTransactionManager
+       tm                spi.RequestTransactionManager
 
        maxSegmentsAccepted   readWriteModel.MaxSegmentsAccepted
        maxApduLengthAccepted readWriteModel.MaxApduLengthAccepted
 }
 
-func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec 
spi.MessageCodec, tm *spi.RequestTransactionManager) *Reader {
+func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec 
spi.MessageCodec, tm spi.RequestTransactionManager) *Reader {
        return &Reader{
                invokeIdGenerator: invokeIdGenerator,
                messageCodec:      messageCodec,
diff --git a/plc4go/internal/cbus/CBusMessageMapper.go 
b/plc4go/internal/cbus/CBusMessageMapper.go
index 672a268bc6..8c9d963c7d 100644
--- a/plc4go/internal/cbus/CBusMessageMapper.go
+++ b/plc4go/internal/cbus/CBusMessageMapper.go
@@ -252,7 +252,7 @@ func producePointToPointCommand(unitAddress 
readWriteModel.UnitAddress, bridgeAd
        return readWriteModel.NewCBusPointToPointCommandDirect(unitAddress, 
0x0000, calData, cbusOptions), nil
 }
 
-func MapEncodedReply(transaction *spi.RequestTransaction, encodedReply 
readWriteModel.EncodedReply, tagName string, addResponseCode func(name string, 
responseCode apiModel.PlcResponseCode), addPlcValue func(name string, plcValue 
apiValues.PlcValue)) error {
+func MapEncodedReply(transaction spi.RequestTransaction, encodedReply 
readWriteModel.EncodedReply, tagName string, addResponseCode func(name string, 
responseCode apiModel.PlcResponseCode), addPlcValue func(name string, plcValue 
apiValues.PlcValue)) error {
        switch reply := encodedReply.(type) {
        case readWriteModel.EncodedReplyCALReplyExactly:
                calData := reply.GetCalReply().GetCalData()
diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go 
b/plc4go/internal/cbus/CBusMessageMapper_test.go
index 2a206ac804..d9987f2ed0 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -292,7 +292,7 @@ func TestTagToCBusMessage(t *testing.T) {
 
 func TestMapEncodedReply(t *testing.T) {
        type args struct {
-               transaction     *spi.RequestTransaction
+               transaction     spi.RequestTransaction
                encodedReply    readWriteModel.EncodedReply
                tagName         string
                addResponseCode func(name string, responseCode 
apiModel.PlcResponseCode)
@@ -306,7 +306,7 @@ func TestMapEncodedReply(t *testing.T) {
                {
                        name: "empty input",
                        args: args{
-                               transaction: func() *spi.RequestTransaction {
+                               transaction: func() spi.RequestTransaction {
                                        transactionManager := 
spi.NewRequestTransactionManager(1)
                                        transaction := 
transactionManager.StartTransaction()
                                        transaction.Submit(func() {
@@ -323,7 +323,7 @@ func TestMapEncodedReply(t *testing.T) {
                {
                        name: "CALDataStatus",
                        args: args{
-                               transaction: func() *spi.RequestTransaction {
+                               transaction: func() spi.RequestTransaction {
                                        transactionManager := 
spi.NewRequestTransactionManager(1)
                                        transaction := 
transactionManager.StartTransaction()
                                        transaction.Submit(func() {
diff --git a/plc4go/internal/cbus/Connection.go 
b/plc4go/internal/cbus/Connection.go
index 91062f1baa..4692f94ee5 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -58,7 +58,7 @@ type Connection struct {
        alphaGenerator AlphaGenerator
        messageCodec   spi.MessageCodec
        subscribers    []*Subscriber
-       tm             *spi.RequestTransactionManager
+       tm             spi.RequestTransactionManager
 
        configuration Configuration
        driverContext DriverContext
@@ -67,7 +67,7 @@ type Connection struct {
        tracer       *spi.Tracer
 }
 
-func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, 
driverContext DriverContext, tagHandler spi.PlcTagHandler, tm 
*spi.RequestTransactionManager, options map[string][]string) *Connection {
+func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, 
driverContext DriverContext, tagHandler spi.PlcTagHandler, tm 
spi.RequestTransactionManager, options map[string][]string) *Connection {
        connection := &Connection{
                alphaGenerator: AlphaGenerator{currentAlpha: 'g'},
                messageCodec:   messageCodec,
diff --git a/plc4go/internal/cbus/Driver.go b/plc4go/internal/cbus/Driver.go
index 9dd7302765..3a4facc222 100644
--- a/plc4go/internal/cbus/Driver.go
+++ b/plc4go/internal/cbus/Driver.go
@@ -44,7 +44,7 @@ type Driver struct {
 
 func NewDriver() plc4go.PlcDriver {
        driver := &Driver{
-               tm:                      *spi.NewRequestTransactionManager(1),
+               tm:                      spi.NewRequestTransactionManager(1),
                awaitSetupComplete:      true,
                awaitDisconnectComplete: true,
        }
@@ -103,7 +103,7 @@ func (m *Driver) GetConnectionWithContext(ctx 
context.Context, transportUrl url.
        driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete
 
        // Create the new connection
-       connection := NewConnection(codec, configuration, driverContext, 
m.GetPlcTagHandler(), &m.tm, options)
+       connection := NewConnection(codec, configuration, driverContext, 
m.GetPlcTagHandler(), m.tm, options)
        log.Debug().Msg("created connection, connecting now")
        return connection.ConnectWithContext(ctx)
 }
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 7b6cf2dfe6..952f639e23 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -36,10 +36,10 @@ import (
 type Reader struct {
        alphaGenerator *AlphaGenerator
        messageCodec   spi.MessageCodec
-       tm             *spi.RequestTransactionManager
+       tm             spi.RequestTransactionManager
 }
 
-func NewReader(tpduGenerator *AlphaGenerator, messageCodec spi.MessageCodec, 
tm *spi.RequestTransactionManager) *Reader {
+func NewReader(tpduGenerator *AlphaGenerator, messageCodec spi.MessageCodec, 
tm spi.RequestTransactionManager) *Reader {
        return &Reader{
                alphaGenerator: tpduGenerator,
                messageCodec:   messageCodec,
@@ -135,7 +135,7 @@ func (m *Reader) readSync(ctx context.Context, readRequest 
apiModel.PlcReadReque
                                }
                                return 
confirmation.GetConfirmation().GetAlpha().GetCharacter() == 
messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
                        }, func(receivedMessage spi.Message) error {
-                               defer func(transaction *spi.RequestTransaction) 
{
+                               defer func(transaction spi.RequestTransaction) {
                                        // This is just to make sure we don't 
forget to close the transaction here
                                        _ = transaction.EndRequest()
                                }(transaction)
diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go
index e9257051fe..9029551317 100644
--- a/plc4go/internal/cbus/Writer.go
+++ b/plc4go/internal/cbus/Writer.go
@@ -35,10 +35,10 @@ import (
 type Writer struct {
        alphaGenerator *AlphaGenerator
        messageCodec   spi.MessageCodec
-       tm             *spi.RequestTransactionManager
+       tm             spi.RequestTransactionManager
 }
 
-func NewWriter(tpduGenerator *AlphaGenerator, messageCodec spi.MessageCodec, 
tm *spi.RequestTransactionManager) Writer {
+func NewWriter(tpduGenerator *AlphaGenerator, messageCodec spi.MessageCodec, 
tm spi.RequestTransactionManager) Writer {
        return Writer{
                alphaGenerator: tpduGenerator,
                messageCodec:   messageCodec,
diff --git a/plc4go/internal/eip/Connection.go 
b/plc4go/internal/eip/Connection.go
index b3c7ccd7ce..cbff5440f4 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -45,7 +45,7 @@ type Connection struct {
        messageCodec              spi.MessageCodec
        configuration             Configuration
        driverContext             DriverContext
-       tm                        *spi.RequestTransactionManager
+       tm                        spi.RequestTransactionManager
        sessionHandle             uint32
        senderContext             []uint8
        connectionId              uint32
@@ -58,7 +58,7 @@ type Connection struct {
        tracer                    *spi.Tracer
 }
 
-func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, 
driverContext DriverContext, tagHandler spi.PlcTagHandler, tm 
*spi.RequestTransactionManager, options map[string][]string) *Connection {
+func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, 
driverContext DriverContext, tagHandler spi.PlcTagHandler, tm 
spi.RequestTransactionManager, options map[string][]string) *Connection {
        connection := &Connection{
                messageCodec:  messageCodec,
                configuration: configuration,
diff --git a/plc4go/internal/eip/EipDriver.go b/plc4go/internal/eip/EipDriver.go
index 0483d3b25e..336fbf7808 100644
--- a/plc4go/internal/eip/EipDriver.go
+++ b/plc4go/internal/eip/EipDriver.go
@@ -40,7 +40,7 @@ type Driver struct {
 
 func NewDriver() plc4go.PlcDriver {
        driver := &Driver{
-               tm:                      *spi.NewRequestTransactionManager(1),
+               tm:                      spi.NewRequestTransactionManager(1),
                awaitSetupComplete:      true,
                awaitDisconnectComplete: true,
        }
@@ -99,7 +99,7 @@ func (m *Driver) GetConnectionWithContext(ctx 
context.Context, transportUrl url.
        driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete
 
        // Create the new connection
-       connection := NewConnection(codec, configuration, driverContext, 
m.GetPlcTagHandler(), &m.tm, options)
+       connection := NewConnection(codec, configuration, driverContext, 
m.GetPlcTagHandler(), m.tm, options)
        log.Debug().Msg("created connection, connecting now")
        return connection.ConnectWithContext(ctx)
 }
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 2cf06398c8..6a77f8f6cc 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -40,12 +40,12 @@ import (
 
 type Reader struct {
        messageCodec  spi.MessageCodec
-       tm            *spi.RequestTransactionManager
+       tm            spi.RequestTransactionManager
        configuration Configuration
        sessionHandle *uint32
 }
 
-func NewReader(messageCodec spi.MessageCodec, tm 
*spi.RequestTransactionManager, configuration Configuration, sessionHandle 
*uint32) *Reader {
+func NewReader(messageCodec spi.MessageCodec, tm 
spi.RequestTransactionManager, configuration Configuration, sessionHandle 
*uint32) *Reader {
        return &Reader{
                messageCodec:  messageCodec,
                tm:            tm,
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index 209a318c4c..b8d2d10d6e 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -36,13 +36,13 @@ import (
 
 type Writer struct {
        messageCodec  spi.MessageCodec
-       tm            *spi.RequestTransactionManager
+       tm            spi.RequestTransactionManager
        configuration Configuration
        sessionHandle *uint32
        senderContext *[]uint8
 }
 
-func NewWriter(messageCodec spi.MessageCodec, tm 
*spi.RequestTransactionManager, configuration Configuration, sessionHandle 
*uint32, senderContext *[]uint8) Writer {
+func NewWriter(messageCodec spi.MessageCodec, tm 
spi.RequestTransactionManager, configuration Configuration, sessionHandle 
*uint32, senderContext *[]uint8) Writer {
        return Writer{
                messageCodec:  messageCodec,
                tm:            tm,
diff --git a/plc4go/internal/s7/Connection.go b/plc4go/internal/s7/Connection.go
index 97e00e90a8..819a72a4a4 100644
--- a/plc4go/internal/s7/Connection.go
+++ b/plc4go/internal/s7/Connection.go
@@ -60,13 +60,13 @@ type Connection struct {
        messageCodec  spi.MessageCodec
        configuration Configuration
        driverContext DriverContext
-       tm            *spi.RequestTransactionManager
+       tm            spi.RequestTransactionManager
 
        connectionId string
        tracer       *spi.Tracer
 }
 
-func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, 
driverContext DriverContext, tagHandler spi.PlcTagHandler, tm 
*spi.RequestTransactionManager, options map[string][]string) *Connection {
+func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, 
driverContext DriverContext, tagHandler spi.PlcTagHandler, tm 
spi.RequestTransactionManager, options map[string][]string) *Connection {
        connection := &Connection{
                tpduGenerator: TpduGenerator{currentTpduId: 10},
                messageCodec:  messageCodec,
diff --git a/plc4go/internal/s7/Driver.go b/plc4go/internal/s7/Driver.go
index edb3801100..4c79b80457 100644
--- a/plc4go/internal/s7/Driver.go
+++ b/plc4go/internal/s7/Driver.go
@@ -40,7 +40,7 @@ type Driver struct {
 
 func NewDriver() plc4go.PlcDriver {
        driver := &Driver{
-               tm:                      *spi.NewRequestTransactionManager(1),
+               tm:                      spi.NewRequestTransactionManager(1),
                awaitSetupComplete:      true,
                awaitDisconnectComplete: true,
        }
@@ -99,7 +99,7 @@ func (m *Driver) GetConnectionWithContext(ctx 
context.Context, transportUrl url.
        driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete
 
        // Create the new connection
-       connection := NewConnection(codec, configuration, driverContext, 
m.GetPlcTagHandler(), &m.tm, options)
+       connection := NewConnection(codec, configuration, driverContext, 
m.GetPlcTagHandler(), m.tm, options)
        log.Debug().Msg("created connection, connecting now")
        return connection.ConnectWithContext(ctx)
 }
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index a0c3aca945..3379200e78 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -36,10 +36,10 @@ import (
 type Reader struct {
        tpduGenerator *TpduGenerator
        messageCodec  spi.MessageCodec
-       tm            *spi.RequestTransactionManager
+       tm            spi.RequestTransactionManager
 }
 
-func NewReader(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm 
*spi.RequestTransactionManager) *Reader {
+func NewReader(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm 
spi.RequestTransactionManager) *Reader {
        return &Reader{
                tpduGenerator: tpduGenerator,
                messageCodec:  messageCodec,
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index cd33c379e7..e8bf221953 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -35,10 +35,10 @@ import (
 type Writer struct {
        tpduGenerator *TpduGenerator
        messageCodec  spi.MessageCodec
-       tm            *spi.RequestTransactionManager
+       tm            spi.RequestTransactionManager
 }
 
-func NewWriter(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm 
*spi.RequestTransactionManager) Writer {
+func NewWriter(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm 
spi.RequestTransactionManager) Writer {
        return Writer{
                tpduGenerator: tpduGenerator,
                messageCodec:  messageCodec,
diff --git a/plc4go/spi/RequestTransactionManager.go 
b/plc4go/spi/RequestTransactionManager.go
index aca4dc4d41..1d4e93dde7 100644
--- a/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/spi/RequestTransactionManager.go
@@ -42,60 +42,87 @@ func init() {
        sharedExecutorInstance.Start()
 }
 
-type RequestTransaction struct {
-       parent        *RequestTransactionManager
-       transactionId int32
-
-       /** The initial operation to perform to kick off the request */
-       operation        utils.Runnable
-       completionFuture utils.CompletionFuture
-
-       transactionLog zerolog.Logger
-}
-
-func (t *RequestTransaction) String() string {
-       return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
+// RequestTransaction represents a transaction
+type RequestTransaction interface {
+       fmt.Stringer
+       // FailRequest signals that this transaction has failed
+       FailRequest(err error) error
+       // EndRequest signals that this transaction is done
+       EndRequest() error
+       // Submit submits a Runnable to the RequestTransactionManager
+       Submit(operation utils.Runnable)
+       // AwaitCompletion wait for this RequestTransaction to finish. Returns 
an error if it finished unsuccessful
+       AwaitCompletion(ctx context.Context) error
 }
 
 // RequestTransactionManager handles transactions
-type RequestTransactionManager struct {
-       runningRequests []*RequestTransaction
-       // How many Transactions are allowed to run at the same time?
-       numberOfConcurrentRequests int
-       // Assigns each request a Unique Transaction Id, especially important 
for failure handling
-       transactionId    int32
-       transactionMutex sync.RWMutex
-       // Important, this is a FIFO Queue for Fairness!
-       workLog      list.List
-       workLogMutex sync.RWMutex
-       executor     utils.Executor
+type RequestTransactionManager interface {
+       // SetNumberOfConcurrentRequests sets the number of concurrent requests 
that will be sent out to a device
+       SetNumberOfConcurrentRequests(numberOfConcurrentRequests int)
+       // StartTransaction starts a RequestTransaction
+       StartTransaction() RequestTransaction
 }
 
 // NewRequestTransactionManager creates a new RequestTransactionManager
-func NewRequestTransactionManager(numberOfConcurrentRequests int, 
requestTransactionManagerOptions ...RequestTransactionManagerOption) 
*RequestTransactionManager {
-       requestTransactionManager := &RequestTransactionManager{
+func NewRequestTransactionManager(numberOfConcurrentRequests int, 
requestTransactionManagerOptions ...RequestTransactionManagerOption) 
RequestTransactionManager {
+       _requestTransactionManager := &requestTransactionManager{
                numberOfConcurrentRequests: numberOfConcurrentRequests,
                transactionId:              0,
                workLog:                    *list.New(),
                executor:                   sharedExecutorInstance,
        }
        for _, requestTransactionManagerOption := range 
requestTransactionManagerOptions {
-               requestTransactionManagerOption(requestTransactionManager)
+               requestTransactionManagerOption(_requestTransactionManager)
        }
-       return requestTransactionManager
+       return _requestTransactionManager
 }
 
-type RequestTransactionManagerOption func(requestTransactionManager 
*RequestTransactionManager)
+type RequestTransactionManagerOption func(requestTransactionManager 
*requestTransactionManager)
 
 // WithCustomExecutor sets a custom Executor for the RequestTransactionManager
 func WithCustomExecutor(executor utils.Executor) 
RequestTransactionManagerOption {
-       return func(requestTransactionManager *RequestTransactionManager) {
+       return func(requestTransactionManager *requestTransactionManager) {
                requestTransactionManager.executor = executor
        }
 }
 
-// SetNumberOfConcurrentRequests sets the number of concurrent requests that 
will be sent out to a device
-func (r *RequestTransactionManager) 
SetNumberOfConcurrentRequests(numberOfConcurrentRequests int) {
+///////////////////////////////////////
+///////////////////////////////////////
+//
+// Internal section
+//
+
+type requestTransaction struct {
+       parent        *requestTransactionManager
+       transactionId int32
+
+       /** The initial operation to perform to kick off the request */
+       operation        utils.Runnable
+       completionFuture utils.CompletionFuture
+
+       transactionLog zerolog.Logger
+}
+
+type requestTransactionManager struct {
+       runningRequests []*requestTransaction
+       // How many Transactions are allowed to run at the same time?
+       numberOfConcurrentRequests int
+       // Assigns each request a Unique Transaction Id, especially important 
for failure handling
+       transactionId    int32
+       transactionMutex sync.RWMutex
+       // Important, this is a FIFO Queue for Fairness!
+       workLog      list.List
+       workLogMutex sync.RWMutex
+       executor     utils.Executor
+}
+
+//
+// Internal section
+//
+///////////////////////////////////////
+///////////////////////////////////////
+
+func (r *requestTransactionManager) 
SetNumberOfConcurrentRequests(numberOfConcurrentRequests int) {
        log.Info().Msgf("Setting new number of concurrent requests %d", 
numberOfConcurrentRequests)
        // If we reduced the number of concurrent requests and more requests 
are in-flight
        // than should be, at least log a warning.
@@ -109,7 +136,7 @@ func (r *RequestTransactionManager) 
SetNumberOfConcurrentRequests(numberOfConcur
        r.processWorklog()
 }
 
-func (r *RequestTransactionManager) submitHandle(handle *RequestTransaction) {
+func (r *requestTransactionManager) submitHandle(handle *requestTransaction) {
        if handle.operation == nil {
                panic("invalid handle")
        }
@@ -122,7 +149,7 @@ func (r *RequestTransactionManager) submitHandle(handle 
*RequestTransaction) {
        r.processWorklog()
 }
 
-func (r *RequestTransactionManager) processWorklog() {
+func (r *requestTransactionManager) processWorklog() {
        r.workLogMutex.RLock()
        defer r.workLogMutex.RUnlock()
        log.Debug().Msgf("Processing work log with size of %d (%d concurrent 
requests allowed)", r.workLog.Len(), r.numberOfConcurrentRequests)
@@ -131,7 +158,7 @@ func (r *RequestTransactionManager) processWorklog() {
                if front == nil {
                        return
                }
-               next := front.Value.(*RequestTransaction)
+               next := front.Value.(*requestTransaction)
                log.Debug().Msgf("Handling next %v. (Adding to running requests 
(length: %d))", next, len(r.runningRequests))
                r.runningRequests = append(r.runningRequests, next)
                completionFuture := r.executor.Submit(context.Background(), 
next.transactionId, next.operation)
@@ -140,8 +167,7 @@ func (r *RequestTransactionManager) processWorklog() {
        }
 }
 
-// StartTransaction starts a RequestTransaction
-func (r *RequestTransactionManager) StartTransaction() *RequestTransaction {
+func (r *requestTransactionManager) StartTransaction() RequestTransaction {
        r.transactionMutex.Lock()
        defer r.transactionMutex.Unlock()
        currentTransactionId := r.transactionId
@@ -150,7 +176,7 @@ func (r *RequestTransactionManager) StartTransaction() 
*RequestTransaction {
        if !config.TraceTransactionManagerTransactions {
                transactionLogger = zerolog.Nop()
        }
-       return &RequestTransaction{
+       return &requestTransaction{
                r,
                currentTransactionId,
                nil,
@@ -159,18 +185,18 @@ func (r *RequestTransactionManager) StartTransaction() 
*RequestTransaction {
        }
 }
 
-func (r *RequestTransactionManager) getNumberOfActiveRequests() int {
+func (r *requestTransactionManager) getNumberOfActiveRequests() int {
        return len(r.runningRequests)
 }
 
-func (r *RequestTransactionManager) failRequest(transaction 
*RequestTransaction, err error) error {
+func (r *requestTransactionManager) failRequest(transaction 
*requestTransaction, err error) error {
        // Try to fail it!
        transaction.completionFuture.Cancel(true, err)
        // End it
        return r.endRequest(transaction)
 }
 
-func (r *RequestTransactionManager) endRequest(transaction 
*RequestTransaction) error {
+func (r *requestTransactionManager) endRequest(transaction 
*requestTransaction) error {
        transaction.transactionLog.Debug().Msg("Trying to find a existing 
transaction")
        found := false
        index := -1
@@ -193,21 +219,18 @@ func (r *RequestTransactionManager) 
endRequest(transaction *RequestTransaction)
        return nil
 }
 
-// FailRequest signals that this transaction has failed
-func (t *RequestTransaction) FailRequest(err error) error {
+func (t *requestTransaction) FailRequest(err error) error {
        t.transactionLog.Trace().Msg("Fail the request")
        return t.parent.failRequest(t, err)
 }
 
-// EndRequest signals that this transaction is done
-func (t *RequestTransaction) EndRequest() error {
+func (t *requestTransaction) EndRequest() error {
        t.transactionLog.Trace().Msg("Ending the request")
        // Remove it from Running Requests
        return t.parent.endRequest(t)
 }
 
-// Submit submits a Runnable to the RequestTransactionManager
-func (t *RequestTransaction) Submit(operation utils.Runnable) {
+func (t *requestTransaction) Submit(operation utils.Runnable) {
        if t.operation != nil {
                panic("Operation already set")
        }
@@ -220,8 +243,7 @@ func (t *RequestTransaction) Submit(operation 
utils.Runnable) {
        t.parent.submitHandle(t)
 }
 
-// AwaitCompletion wait for this RequestTransaction to finish. Returns an 
error if it finished unsuccessful
-func (t *RequestTransaction) AwaitCompletion(ctx context.Context) error {
+func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
        for t.completionFuture == nil {
                time.Sleep(time.Millisecond * 10)
        }
@@ -240,3 +262,7 @@ func (t *RequestTransaction) AwaitCompletion(ctx 
context.Context) error {
        }
        return nil
 }
+
+func (t *requestTransaction) String() string {
+       return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
+}
diff --git a/plc4go/spi/testutils/DriverTestRunner.go 
b/plc4go/spi/testutils/DriverTestRunner.go
index 6e7969e433..9ee445e7cc 100644
--- a/plc4go/spi/testutils/DriverTestRunner.go
+++ b/plc4go/spi/testutils/DriverTestRunner.go
@@ -549,7 +549,9 @@ func RunDriverTestsuiteWithOptions(t *testing.T, driver 
plc4go.PlcDriver, testPa
 }
 
 type ConnectionConnectAwaiter interface {
+       // SetAwaitSetupComplete sets a flag that the driver should await a 
connection completion
        SetAwaitSetupComplete(awaitComplete bool)
+       // SetAwaitDisconnectComplete sets a flag that the driver should await 
a dis-connection completion
        SetAwaitDisconnectComplete(awaitComplete bool)
 }
 

Reply via email to