This is an automated email from the ASF dual-hosted git repository.

ldywicki pushed a commit to branch feature/socketcan-0.8-preparations
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit a314c4034443545aa169422d0abd3c9f29b3d0cd
Author: Ɓukasz Dywicki <[email protected]>
AuthorDate: Sun Nov 8 01:15:24 2020 +0100

    Further work on stabilizing CANopen implementation.
---
 .../api/conversation/canopen/CANConversation.java  |   2 +
 .../canopen/CANOpenConversationBase.java           |  81 ++++++++++-
 .../canopen/SDODownloadConversation.java           |  85 +++++++-----
 .../canopen/SDOUploadConversation.java             |  79 ++++++-----
 .../java/can/canopen/CANOpenAbortException.java    |  23 ++++
 .../java/can/protocol/CANOpenProtocolLogic.java    |  54 ++++++--
 .../java/can/socketcan/SocketCANConversation.java  |   4 +
 .../resources/testsuite/CANOpenDriverSDOIT.xml     | 149 +++++++++++++++++++++
 8 files changed, 389 insertions(+), 88 deletions(-)

diff --git 
a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
 
b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
index 67dd41b..5676ff8 100644
--- 
a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
+++ 
b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java
@@ -14,5 +14,7 @@ public interface CANConversation<W extends CANOpenFrame> {
 
     SendRequestContext<W> send(W frame);
 
+    void sendToWire(W frame);
+
 }
 
diff --git 
a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java
 
b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java
index d3a41ae..bdfc38e 100644
--- 
a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java
+++ 
b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java
@@ -1,20 +1,24 @@
 package org.apache.plc4x.java.can.api.conversation.canopen;
 
+import io.vavr.control.Either;
+import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.value.PlcValue;
 import org.apache.plc4x.java.can.canopen.CANOpenFrame;
-import org.apache.plc4x.java.canopen.readwrite.CANOpenSDORequest;
-import org.apache.plc4x.java.canopen.readwrite.SDORequest;
+import org.apache.plc4x.java.canopen.readwrite.*;
 import org.apache.plc4x.java.canopen.readwrite.io.DataItemIO;
 import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
 import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService;
 import org.apache.plc4x.java.spi.generation.ParseException;
 import org.apache.plc4x.java.spi.generation.ReadBuffer;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+
 public abstract class CANOpenConversationBase {
 
     protected final CANConversation<CANOpenFrame> delegate;
     protected final int nodeId;
-    private final int answerNodeId;
+    protected final int answerNodeId;
 
     public CANOpenConversationBase(CANConversation<CANOpenFrame> delegate, int 
nodeId, int answerNodeId) {
         this.delegate = delegate;
@@ -26,8 +30,27 @@ public abstract class CANOpenConversationBase {
         return DataItemIO.staticParse(new ReadBuffer(data, true), type, 
length);
     }
 
-    protected boolean isTransmitSDOFromReceiver(CANOpenFrame frame) {
-        return frame.getNodeId() == answerNodeId && frame.getService() == 
CANOpenService.TRANSMIT_SDO;
+    protected <T> void onError(CompletableFuture<T> receiver, 
CANOpenSDOResponse response, Throwable error) {
+        if (error != null) {
+            receiver.completeExceptionally(error);
+            return;
+        }
+
+        if (response.getResponse() instanceof SDOAbortResponse) {
+            SDOAbortResponse abort = (SDOAbortResponse) response.getResponse();
+            SDOAbort sdoAbort = abort.getAbort();
+            receiver.completeExceptionally(new PlcException("Could not read 
value. Remote party reported code " + sdoAbort.getCode()));
+        }
+    }
+
+    protected <X extends SDOResponse> Either<SDOAbort, X> unwrap(Class<X> 
payload, SDOResponse response) {
+        if (response instanceof SDOAbortResponse) {
+            return Either.left(((SDOAbortResponse) response).getAbort());
+        }
+        if (payload.isInstance(response)) {
+            return Either.right((X) response);
+        }
+        throw new RuntimeException("Unexpected payload kind " + response);
     }
 
     protected CANOpenFrame createFrame(SDORequest rq) {
@@ -38,4 +61,52 @@ public abstract class CANOpenConversationBase {
             .build();
     }
 
+    static class NodeIdPredicate implements Predicate<CANOpenFrame> {
+
+        private final int nodeId;
+
+        NodeIdPredicate(int nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public boolean test(CANOpenFrame frame) {
+            return frame.getNodeId() == nodeId && frame.getService() == 
CANOpenService.TRANSMIT_SDO;
+        }
+
+        @Override
+        public String toString() {
+            return "NodeIdPredicate [" + nodeId + "]";
+        }
+    }
+
+    static class TypePredicate<T, X> implements Predicate<X> {
+
+        private final Class<T> type;
+
+        public TypePredicate(Class<T> type) {
+            this.type = type;
+        }
+
+        @Override
+        public boolean test(X value) {
+            return type.isInstance(value);
+        }
+
+        public String toString() {
+            return "Type [" + type + "]";
+        }
+    }
+
+    static class TypeOrAbortPredicate<T extends SDOResponse> extends 
TypePredicate<T, SDOResponse> {
+
+        public TypeOrAbortPredicate(Class<T> type) {
+            super(type);
+        }
+
+        @Override
+        public boolean test(SDOResponse response) {
+            return super.test(response) || response instanceof 
SDOAbortResponse;
+        }
+    }
 }
diff --git 
a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java
 
b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java
index 689aec9..f887b10 100644
--- 
a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java
+++ 
b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java
@@ -1,14 +1,13 @@
 package org.apache.plc4x.java.can.api.conversation.canopen;
 
-import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.api.value.PlcValue;
+import org.apache.plc4x.java.can.canopen.CANOpenAbortException;
 import org.apache.plc4x.java.can.canopen.CANOpenFrame;
 import org.apache.plc4x.java.canopen.readwrite.*;
 import org.apache.plc4x.java.canopen.readwrite.io.DataItemIO;
 import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
-import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService;
 import org.apache.plc4x.java.canopen.readwrite.types.SDOResponseCommand;
 import org.apache.plc4x.java.spi.generation.ParseException;
 
@@ -37,17 +36,28 @@ public class SDODownloadConversation extends 
CANOpenConversationBase {
             // segmented
             SDOInitiateSegmentedUploadResponse size = new 
SDOInitiateSegmentedUploadResponse(data.length);
             delegate.send(createFrame(new SDOInitiateDownloadRequest(false, 
true, indexAddress, size)))
-                .check(this::isTransmitSDOFromReceiver)
+                .check(new NodeIdPredicate(answerNodeId))
                 .onTimeout(receiver::completeExceptionally)
                 .onError((response, error) -> 
receiver.completeExceptionally(error))
                 .unwrap(CANOpenFrame::getPayload)
                 .only(CANOpenSDOResponse.class)
                 .unwrap(CANOpenSDOResponse::getResponse)
-                .check(p -> p.getCommand() == 
SDOResponseCommand.INITIATE_DOWNLOAD)
-                .only(SDOInitiateDownloadResponse.class)
-                .check(p -> indexAddress.equals(p.getAddress()))
-                .handle(x -> {
-                    put(data, receiver, false, 0);
+                .check(new 
TypeOrAbortPredicate<>(SDOInitiateDownloadResponse.class))
+                .unwrap(payload -> unwrap(SDOInitiateDownloadResponse.class, 
payload))
+                .handle(either -> {
+                    if (either.isLeft()) {
+                        receiver.completeExceptionally(new 
CANOpenAbortException("Could not initiate upload", either.getLeft().getCode()));
+                    } else {
+                        SDOInitiateDownloadResponse response = either.get();
+                        if (response.getAddress().equals(indexAddress)) {
+                            put(data, receiver, false, 0);
+                        } else {
+                            // TODO find proper error code in spec
+                            SDOAbort abort = new SDOAbort(indexAddress, 1000);
+                            delegate.sendToWire(createFrame(new 
SDOAbortRequest(abort)));
+                            receiver.complete(PlcResponseCode.REMOTE_ERROR);
+                        }
+                    }
                 });
 
             return;
@@ -61,22 +71,25 @@ public class SDODownloadConversation extends 
CANOpenConversationBase {
         );
 
         delegate.send(createFrame(rq))
-            .check(this::isTransmitSDOFromReceiver)
+            .check(new NodeIdPredicate(answerNodeId))
             .onTimeout(receiver::completeExceptionally)
-            .onError((response, error) -> {
-                if (error != null) {
-                    receiver.completeExceptionally(error);
-                } else {
-                    receiver.completeExceptionally(new 
PlcException("Transaction terminated"));
-                }
-            })
             .unwrap(CANOpenFrame::getPayload)
             .only(CANOpenSDOResponse.class)
+            .onError((response, error) -> onError(receiver, response, error))
             .unwrap(CANOpenSDOResponse::getResponse)
-            .only(SDOInitiateDownloadResponse.class)
-            .check(r -> r.getCommand() == SDOResponseCommand.INITIATE_DOWNLOAD)
-            .handle(r -> {
-                receiver.complete(PlcResponseCode.OK);
+            .check(new 
TypeOrAbortPredicate<>(SDOInitiateDownloadResponse.class))
+            .unwrap(payload -> unwrap(SDOInitiateDownloadResponse.class, 
payload))
+            .handle(either -> {
+                if (either.isLeft()) {
+                    receiver.completeExceptionally(new 
CANOpenAbortException("Could not initiate upload", either.getLeft().getCode()));
+                } else {
+                    SDOInitiateDownloadResponse response = either.get();
+                    if (response.getCommand() == 
SDOResponseCommand.INITIATE_DOWNLOAD) {
+                        receiver.complete(PlcResponseCode.OK);
+                    } else {
+                        receiver.complete(PlcResponseCode.REMOTE_ERROR);
+                    }
+                }
             });
     }
 
@@ -86,25 +99,29 @@ public class SDODownloadConversation extends 
CANOpenConversationBase {
         System.arraycopy(data, offset, segment, 0, segment.length);
 
         delegate.send(createFrame(new SDOSegmentDownloadRequest(toggle, 
remaining <= 7, segment)))
-            .check(this::isTransmitSDOFromReceiver)
+            .check(new NodeIdPredicate(answerNodeId))
             .onTimeout(receiver::completeExceptionally)
             .unwrap(CANOpenFrame::getPayload)
             .only(CANOpenSDOResponse.class)
+            .onError((response, error) -> onError(receiver, response, error))
             .unwrap(CANOpenSDOResponse::getResponse)
-            .only(SDOSegmentDownloadResponse.class)
-            .onError((response, error) -> {
-                if (error != null) {
-                    receiver.completeExceptionally(error);
+            .check(new 
TypeOrAbortPredicate<>(SDOSegmentDownloadResponse.class))
+            .unwrap(payload -> unwrap(SDOSegmentDownloadResponse.class, 
payload))
+            .handle(either -> {
+                if (either.isLeft()) {
+                    return;
                 } else {
-                    receiver.completeExceptionally(new 
PlcException("Transaction terminated"));
-                }
-            })
-            .check(response -> response.getToggle() == toggle)
-            .handle(reply -> {
-                if (offset + segment.length == data.length) {
-                    receiver.complete(PlcResponseCode.OK);
-                } else {
-                    put(data, receiver, !toggle, offset + segment.length);
+                    SDOSegmentDownloadResponse response = either.get();
+                    if (response.getToggle() != toggle) {
+                        receiver.complete(PlcResponseCode.REMOTE_ERROR);
+                        return;
+                    }
+
+                    if (offset + segment.length == data.length) {
+                        receiver.complete(PlcResponseCode.OK);
+                    } else {
+                        put(data, receiver, !toggle, offset + segment.length);
+                    }
                 }
             });
     }
diff --git 
a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
 
b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
index 7e0b3b0..ab87926 100644
--- 
a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
+++ 
b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java
@@ -3,6 +3,7 @@ package org.apache.plc4x.java.can.api.conversation.canopen;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.value.PlcValue;
 import org.apache.plc4x.java.can.api.segmentation.accumulator.ByteStorage;
+import org.apache.plc4x.java.can.canopen.CANOpenAbortException;
 import org.apache.plc4x.java.can.canopen.CANOpenFrame;
 import org.apache.plc4x.java.canopen.readwrite.*;
 import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType;
@@ -29,26 +30,22 @@ public class SDOUploadConversation extends 
CANOpenConversationBase {
         SDOInitiateUploadRequest rq = new SDOInitiateUploadRequest(address);
 
         delegate.send(createFrame(rq))
-            .check(this::isTransmitSDOFromReceiver)
+            .check(new NodeIdPredicate(answerNodeId))
             .onTimeout(receiver::completeExceptionally)
             .unwrap(CANOpenFrame::getPayload)
             .only(CANOpenSDOResponse.class)
-            .onError((response, error) -> {
-                if (error != null) {
-                    receiver.completeExceptionally(error);
-                    return;
-                }
-                if (response.getResponse() instanceof SDOAbortResponse) {
-                    SDOAbortResponse abort = (SDOAbortResponse) 
response.getResponse();
-                    SDOAbort sdoAbort = abort.getAbort();
-                    receiver.completeExceptionally(new PlcException("Could not 
read value. Remote party reported code " + sdoAbort.getCode()));
-                }
-            })
+            .onError((payload, error) -> onError(receiver, payload, error))
             .unwrap(CANOpenSDOResponse::getResponse)
-            .only(SDOInitiateUploadResponse.class)
-            .check(response -> response.getAddress().equals(address))
-            .handle(response -> {
-                handle(receiver, response);
+            .check(new TypeOrAbortPredicate<>(SDOInitiateUploadResponse.class))
+            .unwrap(payload -> unwrap(SDOInitiateUploadResponse.class, 
payload))
+            .check(either -> either.isLeft() || 
either.get().getAddress().equals(address))
+            .handle(either -> {
+                if (either.isLeft()) {
+                    SDOAbort abort = either.getLeft();
+                    receiver.completeExceptionally(new 
CANOpenAbortException("Could not complete operation", abort.getCode()));
+                } else {
+                    handle(receiver, either.get());
+                }
             });
     }
 
@@ -80,35 +77,37 @@ public class SDOUploadConversation extends 
CANOpenConversationBase {
     private void fetch(ByteStorage.SDOUploadStorage storage, 
BiConsumer<Integer, byte[]> valueCallback, CompletableFuture<PlcValue> 
receiver, boolean toggle, int size) {
         logger.info("Request next data block for address {}/{}", 
Integer.toHexString(address.getIndex()), 
Integer.toHexString(address.getSubindex()));
         delegate.send(createFrame(new SDOSegmentUploadRequest(toggle)))
-            .check(this::isTransmitSDOFromReceiver)
+            .check(new NodeIdPredicate(answerNodeId))
             .onTimeout(receiver::completeExceptionally)
             .unwrap(CANOpenFrame::getPayload)
             .only(CANOpenSDOResponse.class)
+            .onError((payload, error) -> onError(receiver, payload, error))
             .unwrap(CANOpenSDOResponse::getResponse)
-            .onError((response, error) -> {
-                if (error != null) {
-                    receiver.completeExceptionally(error);
-                    return;
-                }
-
-                if (response instanceof SDOAbortResponse) {
-                    SDOAbortResponse abort = (SDOAbortResponse) response;
-                    SDOAbort sdoAbort = abort.getAbort();
-                    receiver.completeExceptionally(new PlcException("Could not 
read value. Remote party reported code " + sdoAbort.getCode()));
-                }
-            })
-            .only(SDOSegmentUploadResponse.class)
-            .check(r -> r.getToggle() == toggle)
-            .handle(response -> {
-                storage.append(response);
-
-                if (response.getLast()) {
-                    // validate size
-                    logger.trace("Completed reading of data from {}/{}, 
collected {}, wanted {}", Integer.toHexString(address.getIndex()), 
Integer.toHexString(address.getSubindex()), storage.size(), size);
-                    valueCallback.accept(Long.valueOf(size).intValue(), 
storage.get());
+            .check(new TypeOrAbortPredicate<>(SDOSegmentUploadResponse.class))
+            .unwrap(payload -> unwrap(SDOSegmentUploadResponse.class, payload))
+            .handle(either -> {
+                if (either.isLeft()) {
+                    SDOAbort abort = either.getLeft();
+                    receiver.completeExceptionally(new 
CANOpenAbortException("Could not complete operation", abort.getCode()));
                 } else {
-                    logger.trace("Continue reading of data from {}/{}, 
collected {}, wanted {}", Integer.toHexString(address.getIndex()), 
Integer.toHexString(address.getSubindex()), storage.size(), size);
-                    fetch(storage, valueCallback, receiver, !toggle, size);
+                    SDOSegmentUploadResponse response = either.get();
+                    if (response.getToggle() != toggle) {
+                        // TODO find proper error code in specs
+                        receiver.completeExceptionally(new 
CANOpenAbortException("Remote operation failed", 1000));
+                        SDOAbort abort = new SDOAbort(address, 1000);
+                        delegate.sendToWire(createFrame(new 
SDOAbortRequest(abort)));
+                        return;
+                    }
+
+                    storage.append(either.get());
+                    if (response.getLast()) {
+                        // validate size
+                        logger.trace("Completed reading of data from {}/{}, 
collected {}, wanted {}", Integer.toHexString(address.getIndex()), 
Integer.toHexString(address.getSubindex()), storage.size(), size);
+                        valueCallback.accept(Long.valueOf(size).intValue(), 
storage.get());
+                    } else {
+                        logger.trace("Continue reading of data from {}/{}, 
collected {}, wanted {}", Integer.toHexString(address.getIndex()), 
Integer.toHexString(address.getSubindex()), storage.size(), size);
+                        fetch(storage, valueCallback, receiver, !toggle, size);
+                    }
                 }
             });
     }
diff --git 
a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/canopen/CANOpenAbortException.java
 
b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/canopen/CANOpenAbortException.java
new file mode 100644
index 0000000..6ccdcf2
--- /dev/null
+++ 
b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/canopen/CANOpenAbortException.java
@@ -0,0 +1,23 @@
+package org.apache.plc4x.java.can.canopen;
+
+import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
+
+public class CANOpenAbortException extends PlcProtocolException {
+
+    private final long abortCode;
+
+    public CANOpenAbortException(String message, long abortCode) {
+        super(message);
+        this.abortCode = abortCode;
+    }
+
+    public CANOpenAbortException(Throwable cause, long abortCode) {
+        super(cause);
+        this.abortCode = abortCode;
+    }
+
+    public long getAbortCode() {
+        return abortCode;
+    }
+
+}
diff --git 
a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
 
b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
index 830656d..e8c9a0d 100644
--- 
a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
+++ 
b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java
@@ -19,7 +19,13 @@ under the License.
 package org.apache.plc4x.java.can.protocol;
 
 import org.apache.commons.codec.binary.Hex;
-import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
@@ -29,6 +35,8 @@ import org.apache.plc4x.java.api.value.PlcNull;
 import org.apache.plc4x.java.api.value.PlcStruct;
 import org.apache.plc4x.java.api.value.PlcUSINT;
 import org.apache.plc4x.java.api.value.PlcValue;
+import org.apache.plc4x.java.api.value.PlcValues;
+import org.apache.plc4x.java.can.canopen.CANOpenAbortException;
 import org.apache.plc4x.java.can.canopen.CANOpenFrame;
 import org.apache.plc4x.java.can.api.conversation.canopen.CANConversation;
 import 
org.apache.plc4x.java.can.api.conversation.canopen.SDODownloadConversation;
@@ -37,9 +45,18 @@ import 
org.apache.plc4x.java.can.canopen.CANOpenFrameBuilderFactory;
 import 
org.apache.plc4x.java.can.canopen.socketcan.CANOpenSocketCANFrameBuilder;
 import org.apache.plc4x.java.can.configuration.CANConfiguration;
 import org.apache.plc4x.java.can.context.CANOpenDriverContext;
-import org.apache.plc4x.java.can.field.*;
+import org.apache.plc4x.java.can.field.CANOpenField;
+import org.apache.plc4x.java.can.field.CANOpenHeartbeatField;
+import org.apache.plc4x.java.can.field.CANOpenNMTField;
+import org.apache.plc4x.java.can.field.CANOpenPDOField;
+import org.apache.plc4x.java.can.field.CANOpenSDOField;
 import org.apache.plc4x.java.can.socketcan.SocketCANConversation;
-import org.apache.plc4x.java.canopen.readwrite.*;
+import org.apache.plc4x.java.canopen.readwrite.CANOpenHeartbeatPayload;
+import org.apache.plc4x.java.canopen.readwrite.CANOpenNetworkPayload;
+import org.apache.plc4x.java.canopen.readwrite.CANOpenPDO;
+import org.apache.plc4x.java.canopen.readwrite.CANOpenPDOPayload;
+import org.apache.plc4x.java.canopen.readwrite.CANOpenPayload;
+import org.apache.plc4x.java.canopen.readwrite.IndexAddress;
 import org.apache.plc4x.java.canopen.readwrite.io.CANOpenPayloadIO;
 import org.apache.plc4x.java.canopen.readwrite.io.DataItemIO;
 import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService;
@@ -52,7 +69,14 @@ import org.apache.plc4x.java.spi.context.DriverContext;
 import org.apache.plc4x.java.spi.generation.ParseException;
 import org.apache.plc4x.java.spi.generation.ReadBuffer;
 import org.apache.plc4x.java.spi.generation.WriteBuffer;
-import org.apache.plc4x.java.spi.messages.*;
+import org.apache.plc4x.java.spi.messages.DefaultPlcReadResponse;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionResponse;
+import org.apache.plc4x.java.spi.messages.DefaultPlcWriteResponse;
+import org.apache.plc4x.java.spi.messages.InternalPlcReadRequest;
+import org.apache.plc4x.java.spi.messages.InternalPlcSubscriptionRequest;
+import org.apache.plc4x.java.spi.messages.InternalPlcWriteRequest;
+import org.apache.plc4x.java.spi.messages.PlcSubscriber;
 import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
 import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
 import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
@@ -114,7 +138,7 @@ public class CANOpenProtocolLogic extends 
Plc4xProtocolBase<CANOpenFrame> implem
             if (configuration.isHeartbeat()) {
                 context.sendToWire(createFrame(new 
CANOpenHeartbeatPayload(NMTState.BOOTED_UP)));
 
-                this.heartbeat = new Timer();
+                this.heartbeat = new Timer("can-heartbeat");
                 this.heartbeat.scheduleAtFixedRate(new TimerTask() {
                     @Override
                     public void run() {
@@ -124,7 +148,7 @@ public class CANOpenProtocolLogic extends 
Plc4xProtocolBase<CANOpenFrame> implem
                             e.printStackTrace();
                         }
                     }
-                }, 5000, 5000);
+                }, 10000, 10000);
             }
             context.fireConnected();
         } catch (ParseException e) {
@@ -176,7 +200,11 @@ public class CANOpenProtocolLogic extends 
Plc4xProtocolBase<CANOpenFrame> implem
         CompletableFuture<PlcResponseCode> callback = new 
CompletableFuture<>();
         callback.whenComplete((code, error) -> {
             if (error != null) {
-                response.completeExceptionally(error);
+                if (error instanceof CANOpenAbortException) {
+                    response.complete(new 
DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, 
PlcResponseCode.REMOTE_ERROR)));
+                } else {
+                    response.complete(new 
DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, 
PlcResponseCode.INTERNAL_ERROR)));
+                }
                 transaction.endRequest();
                 return;
             }
@@ -185,7 +213,8 @@ public class CANOpenProtocolLogic extends 
Plc4xProtocolBase<CANOpenFrame> implem
         });
 
         PlcValue writeValue = writeRequest.getPlcValues().get(0);
-        SDODownloadConversation download = new 
SDODownloadConversation(conversation, field.getNodeId(), 
field.getAnswerNodeId(), new IndexAddress(field.getIndex(), 
field.getSubIndex()), writeValue, field.getCanOpenDataType());
+        SDODownloadConversation download = new 
SDODownloadConversation(conversation, field.getNodeId(), 
field.getAnswerNodeId(),
+            new IndexAddress(field.getIndex(), field.getSubIndex()), 
writeValue, field.getCanOpenDataType());
         transaction.submit(() -> download.execute(callback));
     }
 
@@ -272,8 +301,15 @@ public class CANOpenProtocolLogic extends 
Plc4xProtocolBase<CANOpenFrame> implem
         CompletableFuture<PlcValue> callback = new CompletableFuture<>();
         callback.whenComplete((value, error) -> {
             if (error != null) {
-                response.completeExceptionally(error);
+                Map<String, ResponseItem<PlcValue>> fields = new HashMap<>();
+                if (error instanceof CANOpenAbortException) {
+                    fields.put(fieldName, new 
ResponseItem<>(PlcResponseCode.REMOTE_ERROR, 
PlcValues.of(((CANOpenAbortException) error).getAbortCode())));
+                } else {
+                    fields.put(fieldName, new 
ResponseItem<>(PlcResponseCode.REMOTE_ERROR, null));
+                }
+                response.complete(new DefaultPlcReadResponse(readRequest, 
fields));
                 transaction.endRequest();
+
                 return;
             }
 
diff --git 
a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
 
b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
index 305ed68..1ba85f4 100644
--- 
a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
+++ 
b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java
@@ -39,4 +39,8 @@ public class SocketCANConversation implements 
CANConversation<CANOpenFrame> {
             .expectResponse(CANOpenFrame.class, Duration.ofMillis(timeout));
     }
 
+    public void sendToWire(CANOpenFrame frame) {
+        context.sendToWire(frame);
+    }
+
 }
diff --git 
a/sandbox/test-java-can-driver/src/test/resources/testsuite/CANOpenDriverSDOIT.xml
 
b/sandbox/test-java-can-driver/src/test/resources/testsuite/CANOpenDriverSDOIT.xml
index f0e7e0d..1e1ddd3 100644
--- 
a/sandbox/test-java-can-driver/src/test/resources/testsuite/CANOpenDriverSDOIT.xml
+++ 
b/sandbox/test-java-can-driver/src/test/resources/testsuite/CANOpenDriverSDOIT.xml
@@ -127,6 +127,82 @@
   </testcase>
 
   <testcase>
+    <name>Expedited SDO read request - short</name>
+    <description>
+      Single field read request which answers with 4 bytes of data.
+    </description>
+    <steps>
+      <api-request name="Receive Read Request from application">
+        <TestReadRequest 
className="org.apache.plc4x.test.driver.model.api.TestReadRequest">
+          <fields>
+            <field 
className="org.apache.plc4x.test.driver.model.api.TestField">
+              <name>sdo1</name>
+              <address>SDO:1:9186/1:UNSIGNED8</address>
+            </field>
+          </fields>
+        </TestReadRequest>
+      </api-request>
+      <outgoing-plc-message name="Send SDO Initialize Upload Request">
+        <CANOpenSocketCANFrame 
className="org.apache.plc4x.java.can.canopen.socketcan.CANOpenSocketCANFrame">
+          <nodeId>1</nodeId>
+          <service>RECEIVE_SDO</service>
+          <payload 
className="org.apache.plc4x.java.canopen.readwrite.CANOpenSDORequest">
+            <command>INITIATE_UPLOAD</command>
+            <request 
className="org.apache.plc4x.java.canopen.readwrite.SDOInitiateUploadRequest">
+              <address 
className="org.apache.plc4x.java.canopen.readwrite.IndexAddress">
+                <index>9186</index>
+                <subindex>1</subindex>
+              </address>
+            </request>
+          </payload>
+        </CANOpenSocketCANFrame>
+      </outgoing-plc-message>
+      <incoming-plc-message name="Receive SDO Initialize Upload Response for 
other node">
+        <!-- one unwanted frame -->
+        <CANOpenSocketCANFrame 
className="org.apache.plc4x.java.can.canopen.socketcan.CANOpenSocketCANFrame">
+          <nodeId>1</nodeId>
+          <service>TRANSMIT_SDO</service>
+          <payload 
className="org.apache.plc4x.java.canopen.readwrite.CANOpenSDOResponse">
+            <command>INITIATE_UPLOAD</command>
+            <response 
className="org.apache.plc4x.java.canopen.readwrite.SDOInitiateUploadResponse">
+              <expedited>true</expedited>
+              <indicated>true</indicated>
+              <address 
className="org.apache.plc4x.java.canopen.readwrite.IndexAddress">
+                <index>9186</index>
+                <subindex>1</subindex>
+              </address>
+              <payload 
className="org.apache.plc4x.java.canopen.readwrite.SDOInitiateExpeditedUploadResponse">
+                <data>YXNkZg==</data>
+              </payload>
+            </response>
+          </payload>
+        </CANOpenSocketCANFrame>
+      </incoming-plc-message>
+      <api-response name="Report Read Response to application">
+        <DefaultPlcReadResponse 
className="org.apache.plc4x.java.spi.messages.DefaultPlcReadResponse">
+          <request 
className="org.apache.plc4x.java.spi.messages.DefaultPlcReadRequest">
+            <sdo1 className="org.apache.plc4x.java.can.field.CANOpenSDOField">
+              <nodeId>1</nodeId>
+              <index>9186</index>
+              <subIndex>1</subIndex>
+              <canOpenDataType>UNSIGNED8</canOpenDataType>
+              <answerNodeId>1</answerNodeId>
+            </sdo1>
+          </request>
+          <sdo1>
+            <code>OK</code>
+            <value className="org.apache.plc4x.java.api.value.PlcUSINT">
+              <object>java.lang.Short</object>
+              <object>97</object>
+            </value>
+          </sdo1>
+        </DefaultPlcReadResponse>
+      </api-response>
+      <delay>1000</delay>
+    </steps>
+  </testcase>
+
+  <testcase>
     <name>Expedited SDO read request - custom answer node id</name>
     <description>
       Single field read request which answers with 4 bytes of data.
@@ -419,4 +495,77 @@
     </steps>
   </testcase>
 
+  <testcase>
+    <name>Expedited SDO read request with abort answer</name>
+    <description>
+      Single field read request which answers with 4 bytes of data.
+    </description>
+    <steps>
+      <api-request name="Receive Read Request from application">
+        <TestReadRequest 
className="org.apache.plc4x.test.driver.model.api.TestReadRequest">
+          <fields>
+            <field 
className="org.apache.plc4x.test.driver.model.api.TestField">
+              <name>sdo1</name>
+              <address>SDO:1:1000/22:UNSIGNED32</address>
+            </field>
+          </fields>
+        </TestReadRequest>
+      </api-request>
+      <outgoing-plc-message name="Send SDO Initialize Upload Request">
+        <CANOpenSocketCANFrame 
className="org.apache.plc4x.java.can.canopen.socketcan.CANOpenSocketCANFrame">
+          <nodeId>1</nodeId>
+          <service>RECEIVE_SDO</service>
+          <payload 
className="org.apache.plc4x.java.canopen.readwrite.CANOpenSDORequest">
+            <command>INITIATE_UPLOAD</command>
+            <request 
className="org.apache.plc4x.java.canopen.readwrite.SDOInitiateUploadRequest">
+              <address 
className="org.apache.plc4x.java.canopen.readwrite.IndexAddress">
+                <index>1000</index>
+                <subindex>22</subindex>
+              </address>
+            </request>
+          </payload>
+        </CANOpenSocketCANFrame>
+      </outgoing-plc-message>
+      <incoming-plc-message name="Receive SDO Initialize Upload Response for 
other node">
+        <CANOpenSocketCANFrame 
className="org.apache.plc4x.java.can.canopen.socketcan.CANOpenSocketCANFrame">
+          <nodeId>1</nodeId>
+          <service>TRANSMIT_SDO</service>
+          <payload 
className="org.apache.plc4x.java.canopen.readwrite.CANOpenSDOResponse">
+            <command>ABORT</command>
+            <response 
className="org.apache.plc4x.java.canopen.readwrite.SDOAbortResponse">
+              <abort 
className="org.apache.plc4x.java.canopen.readwrite.SDOAbort">
+                <address 
className="org.apache.plc4x.java.canopen.readwrite.IndexAddress">
+                  <index>1001</index>
+                  <subindex>22</subindex>
+                </address>
+                <code>84082688</code>
+              </abort>
+            </response>
+          </payload>
+        </CANOpenSocketCANFrame>
+      </incoming-plc-message>
+      <api-response name="Report Read Response to application">
+        <DefaultPlcReadResponse 
className="org.apache.plc4x.java.spi.messages.DefaultPlcReadResponse">
+          <request 
className="org.apache.plc4x.java.spi.messages.DefaultPlcReadRequest">
+            <sdo1 className="org.apache.plc4x.java.can.field.CANOpenSDOField">
+              <nodeId>1</nodeId>
+              <index>1000</index>
+              <subIndex>22</subIndex>
+              <canOpenDataType>UNSIGNED32</canOpenDataType>
+              <answerNodeId>1</answerNodeId>
+            </sdo1>
+          </request>
+          <sdo1>
+            <code>REMOTE_ERROR</code>
+            <value className="org.apache.plc4x.java.api.value.PlcLINT">
+              <object>java.lang.Long</object>
+              <object>84082688</object>
+            </value>
+          </sdo1>
+        </DefaultPlcReadResponse>
+      </api-response>
+      <delay>1000</delay>
+    </steps>
+  </testcase>
+
 </test:driver-testsuite>
\ No newline at end of file

Reply via email to