This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 53cc3a1f75 fix(#3727): Implement failure and recovery strategy in 
ContinuousPlcRequestReader (#3728)
53cc3a1f75 is described below

commit 53cc3a1f75db4b9c7dbb8e6c355fe60ef815fc1b
Author: Philipp Zehnder <[email protected]>
AuthorDate: Wed Aug 13 13:33:55 2025 +0200

    fix(#3727): Implement failure and recovery strategy in 
ContinuousPlcRequestReader (#3728)
---
 .../connection/ContinuousPlcRequestReader.java     | 78 ++++++++++++++++++----
 1 file changed, 65 insertions(+), 13 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
index b75cafc7bd..639f4e0534 100644
--- 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
+++ 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
@@ -26,6 +26,7 @@ import 
org.apache.streampipes.extensions.management.connect.adapter.util.Polling
 
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.PlcConnectionManager;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,32 +37,83 @@ public class ContinuousPlcRequestReader
     extends OneTimePlcRequestReader implements IPullAdapter {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousPlcRequestReader.class);
+  private static final int MAX_IDLE_PULLS = 300;
 
   private final IEventCollector collector;
+  private int idlePullsBeforeNextAttempt = 0;
+  private int currentIdlePulls = 0;
 
-  public ContinuousPlcRequestReader(PlcConnectionManager connectionManager,
-                                    Plc4xConnectionSettings settings,
-                                    PlcRequestProvider requestProvider,
-                                    IEventCollector collector) {
+  /**
+   *  Failure and recovery strategy:
+   * - If a read fails, the number of idle pulls before the next attempt is 
doubled, up to a maximum of 300.
+   * - If the read is successful, the idle pull counter is reset.
+   */
+  public ContinuousPlcRequestReader(
+      PlcConnectionManager connectionManager,
+      Plc4xConnectionSettings settings,
+      PlcRequestProvider requestProvider,
+      IEventCollector collector
+  ) {
     super(connectionManager, settings, requestProvider);
     this.collector = collector;
   }
 
   @Override
   public void pullData() throws RuntimeException {
+    if (currentIdlePulls < idlePullsBeforeNextAttempt) {
+      idleRead();
+    } else {
+      connectAndReadPlcData();
+    }
+  }
+
+  private void connectAndReadPlcData() {
     try (PlcConnection plcConnection = 
connectionManager.getConnection(settings.connectionString())) {
       var readRequest = requestProvider.makeReadRequest(plcConnection, 
settings.nodes());
-      var readResponse = readRequest.execute().get(5000, 
TimeUnit.MILLISECONDS);
-      var event = eventGenerator.makeEvent(readResponse);
-      collector.collect(event);
+      var readResponse = readRequest.execute()
+                                    .get(5000, TimeUnit.MILLISECONDS);
+      processPlcReadResponse(readResponse);
     } catch (Exception e) {
-      // ensure that the cached connection manager removes the broken 
connection
-      if (connectionManager instanceof CachedPlcConnectionManager) {
-        ((CachedPlcConnectionManager) 
connectionManager).removeCachedConnection(settings.connectionString());
-      }
-      LOG.error("Error while reading from PLC with connection string {}: {} ",
-          settings.connectionString(), e.getMessage());
+      handleFailingPlcRead(e);
+    }
+  }
+
+  private void processPlcReadResponse(PlcReadResponse readResponse) {
+    var event = eventGenerator.makeEvent(readResponse);
+    collector.collect(event);
+    this.resetIdlePulls();
+  }
+
+  private void handleFailingPlcRead(Exception e) {
+    // ensure that the cached connection manager removes the broken connection
+    if (connectionManager instanceof CachedPlcConnectionManager) {
+      ((CachedPlcConnectionManager) 
connectionManager).removeCachedConnection(settings.connectionString());
     }
+
+    // Increase backoff counter on failure
+    if (idlePullsBeforeNextAttempt == 0) {
+      idlePullsBeforeNextAttempt = 1;
+    } else {
+      idlePullsBeforeNextAttempt = Math.min(idlePullsBeforeNextAttempt * 2, 
MAX_IDLE_PULLS);
+    }
+
+    LOG.error(
+        "Error while reading from PLC with connection string {}. Setting 
adapter to idle for {} attemtps. {} ",
+        settings.connectionString(), idlePullsBeforeNextAttempt, e.getMessage()
+    );
+
+    currentIdlePulls = 0;
+  }
+
+  private void idleRead() {
+    LOG.debug("Skipping pullData call for {}. Idle pulls left: {}",
+              settings.connectionString(), idlePullsBeforeNextAttempt - 
currentIdlePulls);
+    currentIdlePulls++;
+  }
+
+  private void resetIdlePulls() {
+    idlePullsBeforeNextAttempt = 0;
+    currentIdlePulls = 0;
   }
 
   @Override

Reply via email to