This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 53cc3a1f75 fix(#3727): Implement failure and recovery strategy in
ContinuousPlcRequestReader (#3728)
53cc3a1f75 is described below
commit 53cc3a1f75db4b9c7dbb8e6c355fe60ef815fc1b
Author: Philipp Zehnder <[email protected]>
AuthorDate: Wed Aug 13 13:33:55 2025 +0200
fix(#3727): Implement failure and recovery strategy in
ContinuousPlcRequestReader (#3728)
---
.../connection/ContinuousPlcRequestReader.java | 78 ++++++++++++++++++----
1 file changed, 65 insertions(+), 13 deletions(-)
diff --git
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
index b75cafc7bd..639f4e0534 100644
---
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
+++
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
@@ -26,6 +26,7 @@ import
org.apache.streampipes.extensions.management.connect.adapter.util.Polling
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.PlcConnectionManager;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,32 +37,83 @@ public class ContinuousPlcRequestReader
extends OneTimePlcRequestReader implements IPullAdapter {
private static final Logger LOG =
LoggerFactory.getLogger(ContinuousPlcRequestReader.class);
+ private static final int MAX_IDLE_PULLS = 300;
private final IEventCollector collector;
+ private int idlePullsBeforeNextAttempt = 0;
+ private int currentIdlePulls = 0;
- public ContinuousPlcRequestReader(PlcConnectionManager connectionManager,
- Plc4xConnectionSettings settings,
- PlcRequestProvider requestProvider,
- IEventCollector collector) {
+ /**
+ * Failure and recovery strategy:
+ * - If a read fails, the number of idle pulls before the next attempt is
doubled, up to a maximum of 300.
+ * - If the read is successful, the idle pull counter is reset.
+ */
+ public ContinuousPlcRequestReader(
+ PlcConnectionManager connectionManager,
+ Plc4xConnectionSettings settings,
+ PlcRequestProvider requestProvider,
+ IEventCollector collector
+ ) {
super(connectionManager, settings, requestProvider);
this.collector = collector;
}
@Override
public void pullData() throws RuntimeException {
+ if (currentIdlePulls < idlePullsBeforeNextAttempt) {
+ idleRead();
+ } else {
+ connectAndReadPlcData();
+ }
+ }
+
+ private void connectAndReadPlcData() {
try (PlcConnection plcConnection =
connectionManager.getConnection(settings.connectionString())) {
var readRequest = requestProvider.makeReadRequest(plcConnection,
settings.nodes());
- var readResponse = readRequest.execute().get(5000,
TimeUnit.MILLISECONDS);
- var event = eventGenerator.makeEvent(readResponse);
- collector.collect(event);
+ var readResponse = readRequest.execute()
+ .get(5000, TimeUnit.MILLISECONDS);
+ processPlcReadResponse(readResponse);
} catch (Exception e) {
- // ensure that the cached connection manager removes the broken
connection
- if (connectionManager instanceof CachedPlcConnectionManager) {
- ((CachedPlcConnectionManager)
connectionManager).removeCachedConnection(settings.connectionString());
- }
- LOG.error("Error while reading from PLC with connection string {}: {} ",
- settings.connectionString(), e.getMessage());
+ handleFailingPlcRead(e);
+ }
+ }
+
+ private void processPlcReadResponse(PlcReadResponse readResponse) {
+ var event = eventGenerator.makeEvent(readResponse);
+ collector.collect(event);
+ this.resetIdlePulls();
+ }
+
+ private void handleFailingPlcRead(Exception e) {
+ // ensure that the cached connection manager removes the broken connection
+ if (connectionManager instanceof CachedPlcConnectionManager) {
+ ((CachedPlcConnectionManager)
connectionManager).removeCachedConnection(settings.connectionString());
}
+
+ // Increase backoff counter on failure
+ if (idlePullsBeforeNextAttempt == 0) {
+ idlePullsBeforeNextAttempt = 1;
+ } else {
+ idlePullsBeforeNextAttempt = Math.min(idlePullsBeforeNextAttempt * 2,
MAX_IDLE_PULLS);
+ }
+
+ LOG.error(
+ "Error while reading from PLC with connection string {}. Setting
adapter to idle for {} attemtps. {} ",
+ settings.connectionString(), idlePullsBeforeNextAttempt, e.getMessage()
+ );
+
+ currentIdlePulls = 0;
+ }
+
+ private void idleRead() {
+ LOG.debug("Skipping pullData call for {}. Idle pulls left: {}",
+ settings.connectionString(), idlePullsBeforeNextAttempt -
currentIdlePulls);
+ currentIdlePulls++;
+ }
+
+ private void resetIdlePulls() {
+ idlePullsBeforeNextAttempt = 0;
+ currentIdlePulls = 0;
}
@Override