This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new cabc35dd35 fix(plc4go/cbus): limit discoverer with semaphore
cabc35dd35 is described below

commit cabc35dd353132b8cecdf1438cd0c98f43d85957
Author: Sebastian Rühl <[email protected]>
AuthorDate: Tue Mar 21 11:52:04 2023 +0100

    fix(plc4go/cbus): limit discoverer with semaphore
---
 plc4go/internal/cbus/Discoverer.go | 22 +++++++++++++++-------
 1 file changed, 15 insertions(+), 7 deletions(-)

diff --git a/plc4go/internal/cbus/Discoverer.go 
b/plc4go/internal/cbus/Discoverer.go
index 2d3da0f9c1..786fa5930f 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -23,6 +23,7 @@ import (
        "context"
        "fmt"
        "github.com/apache/plc4x/plc4go/spi/transports/tcp"
+       "golang.org/x/sync/semaphore"
        "net"
        "net/url"
        "sync"
@@ -30,7 +31,6 @@ import (
 
        apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
        readWriteModel 
"github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
-       "github.com/apache/plc4x/plc4go/spi"
        internalModel "github.com/apache/plc4x/plc4go/spi/model"
        "github.com/apache/plc4x/plc4go/spi/options"
        "github.com/apache/plc4x/plc4go/spi/transports"
@@ -40,11 +40,13 @@ import (
 )
 
 type Discoverer struct {
-       messageCodec spi.MessageCodec
+       maxConcurrency int64
 }
 
 func NewDiscoverer() *Discoverer {
-       return &Discoverer{}
+       return &Discoverer{
+               maxConcurrency: 50,
+       }
 }
 
 func (d *Discoverer) Discover(ctx context.Context, callback func(event 
apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) 
error {
@@ -74,6 +76,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback 
func(event apiModel.
        }
 
        transportInstances := make(chan transports.TransportInstance)
+       sem := semaphore.NewWeighted(d.maxConcurrency)
        wg := &sync.WaitGroup{}
        // Iterate over all network devices of this system.
        for _, netInterface := range interfaces {
@@ -105,7 +108,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback 
func(event apiModel.
                                if ipv4Addr == nil || ipv4Addr.IsLoopback() {
                                        continue
                                }
-                               addresses, err := 
utils.GetIPAddresses(context.TODO(), netInterface, false)
+                               addresses, err := utils.GetIPAddresses(ctx, 
netInterface, false)
                                if err != nil {
                                        log.Warn().Err(err).Msgf("Can't get 
addresses for %v", netInterface)
                                        continue
@@ -117,7 +120,12 @@ func (d *Discoverer) Discover(ctx context.Context, 
callback func(event apiModel.
                                                log.Trace().Msgf("Handling 
found ip %v", ip)
                                                wg.Add(1)
                                                go func(ip net.IP) {
-                                                       defer func() { 
wg.Done() }()
+                                                       if err := 
sem.Acquire(ctx, 1); err != nil {
+                                                               
log.Debug().Err(err).Msg("Error acquiring")
+                                                               return
+                                                       }
+                                                       defer sem.Release(1)
+                                                       defer wg.Done()
                                                        // Create a new 
"connection" (Actually open a local udp socket and target outgoing packets to 
that address)
                                                        var connectionUrl 
url.URL
                                                        {
@@ -225,7 +233,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback 
func(event apiModel.
                                        }
                                        var remoteUrl url.URL
                                        {
-                                               // TODO: we could check for the 
exact reponse
+                                               // TODO: we could check for the 
exact response
                                                remoteUrlParse, err := 
url.Parse(fmt.Sprintf("tcp://%s", tcpTransportInstance.RemoteAddress))
                                                if err != nil {
                                                        
log.Error().Err(err).Msg("Error creating url")
@@ -233,7 +241,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback 
func(event apiModel.
                                                }
                                                remoteUrl = *remoteUrlParse
                                        }
-                                       // TODO: manufaturer + type would be 
good but this means two requests then
+                                       // TODO: manufacturer + type would be 
good but this means two requests then
                                        deviceName := 
identifyReplyCommand.GetManufacturerName()
                                        discoveryEvent := 
&internalModel.DefaultPlcDiscoveryItem{
                                                ProtocolCode:  "c-bus",

Reply via email to