This is an automated email from the ASF dual-hosted git repository. ldywicki pushed a commit to branch feature/socketcan-0.8-preparations in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 9f5a4b21a52b0d0193f9dacd2ea4f2d4ca7dfbc6 Author: Ćukasz Dywicki <[email protected]> AuthorDate: Fri Oct 9 11:46:04 2020 +0200 Working on SDO upload (read) operation coordination. --- .../spi/transaction/RequestTransactionManager.java | 23 +++++- .../src/main/resources/protocols/can/canopen.mspec | 9 +- sandbox/test-java-can-driver/pom.xml | 4 + .../api/conversation/canopen/CANConversation.java | 4 +- .../conversation/canopen/CANOpenConversation.java | 14 +++- .../canopen/SDOUploadConversation.java | 95 ++++++++++++++++------ .../java/can/configuration/CANConfiguration.java | 6 +- .../plc4x/java/can/field/CANOpenPDOField.java | 19 ++++- .../java/can/protocol/CANOpenProtocolLogic.java | 76 ++++++++++++----- .../can/protocol/CANOpenSubscriptionHandle.java | 17 +++- .../java/can/socketcan/SocketCANConversation.java | 32 ++++---- 11 files changed, 224 insertions(+), 75 deletions(-) diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java index e49953c..5fb2ed5 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/transaction/RequestTransactionManager.java @@ -21,6 +21,7 @@ package org.apache.plc4x.java.spi.transaction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import java.util.Objects; import java.util.Queue; @@ -173,7 +174,8 @@ public class RequestTransactionManager { } public void submit(Runnable operation) { - this.setOperation(operation); + logger.trace("Submission of transaction {}", transactionId); + this.setOperation(new TransactionOperation(transactionId, operation)); this.parent.submit(this); } @@ -189,6 +191,25 @@ public class RequestTransactionManager { public int hashCode() { return Objects.hash(transactionId); } + } + static class TransactionOperation implements Runnable { + private final int transactionId; + private final Runnable delegate; + + public TransactionOperation(int transactionId, Runnable delegate) { + this.transactionId = transactionId; + this.delegate = delegate; + } + + @Override + public void run() { + try (final MDC.MDCCloseable closeable = MDC.putCloseable("plc4x.transactionId", Integer.toString(transactionId))) { + logger.trace("Start execution of transaction {}", transactionId); + delegate.run(); + logger.trace("Completed execution of transaction {}", transactionId); + } + } + } } diff --git a/protocols/can/src/main/resources/protocols/can/canopen.mspec b/protocols/can/src/main/resources/protocols/can/canopen.mspec index 10afdf9..7886b88 100644 --- a/protocols/can/src/main/resources/protocols/can/canopen.mspec +++ b/protocols/can/src/main/resources/protocols/can/canopen.mspec @@ -353,4 +353,11 @@ ] ] ] -] \ No newline at end of file +] + +// utility type quickly write data for mapped/manufacturer PDOs +[type 'CANOpenMPDO' + [simple uint 8 'node'] + [simple IndexAddress 'address'] + [array int 8 'data' COUNT '4'] +] diff --git a/sandbox/test-java-can-driver/pom.xml b/sandbox/test-java-can-driver/pom.xml index e3e72c4..4ed0eaa 100644 --- a/sandbox/test-java-can-driver/pom.xml +++ b/sandbox/test-java-can-driver/pom.xml @@ -145,6 +145,10 @@ <artifactId>jackson-dataformat-xml</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> </dependencies> </project> diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java index 6df8b0c..be4675b 100644 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java +++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java @@ -9,9 +9,11 @@ import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTr public interface CANConversation<W extends CANFrame> { + int getNodeId(); + CANFrameBuilder<W> frameBuilder(); - void send(W frame, BiConsumer<RequestTransaction, SendRequestContext<W>> callback); + void send(RequestTransaction transaction, W frame, BiConsumer<RequestTransaction, SendRequestContext<W>> callback); } diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java index abe6aa0..fee418d 100644 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java +++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java @@ -9,16 +9,23 @@ import org.apache.plc4x.java.spi.ConversationContext.SendRequestContext; import org.apache.plc4x.java.spi.generation.ParseException; import org.apache.plc4x.java.spi.generation.ReadBuffer; import org.apache.plc4x.java.spi.generation.WriteBuffer; +import org.apache.plc4x.java.spi.transaction.RequestTransactionManager; import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTransaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; public class CANOpenConversation<W extends CANFrame> { + private final Logger logger = LoggerFactory.getLogger(CANOpenConversation.class); + private final RequestTransaction transaction; private final int node; private final CANConversation<W> delegate; - public CANOpenConversation(int node, CANConversation<W> delegate) { + public CANOpenConversation(RequestTransaction transaction, int node, CANConversation<W> delegate) { + this.transaction = transaction; this.node = node; this.delegate = delegate; } @@ -30,7 +37,8 @@ public class CANOpenConversation<W extends CANFrame> { public void send(CANOpenService service, CANOpenPayload payload, BiConsumer<RequestTransaction, SendRequestContext<CANOpenPayload>> callback) { CANFrameBuilder<W> builder = delegate.frameBuilder(); W frame = builder.node(service.getMin() + node).data(serialize(payload)).build(); - delegate.send(frame, (tx, ctx) -> { + logger.info("Request data under transaction {}", transaction); + delegate.send(transaction, frame, (tx, ctx) -> { SendRequestContext<CANOpenPayload> unwrap = ctx // .onError((response, error) -> { // System.err.println("Unexpected frame " + response + " " + error); @@ -38,6 +46,8 @@ public class CANOpenConversation<W extends CANFrame> { .unwrap(CANOpenConversation.this::deserialize); callback.accept(tx, unwrap); }); + + } private CANOpenPayload deserialize(CANFrame frame) { diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java index cb2d778..0bb0998 100644 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java +++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java @@ -7,11 +7,16 @@ import org.apache.plc4x.java.can.api.segmentation.accumulator.ByteStorage; import org.apache.plc4x.java.canopen.readwrite.*; import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType; import org.apache.plc4x.java.spi.generation.ParseException; +import org.apache.plc4x.java.spi.transaction.RequestTransactionManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; public class SDOUploadConversation<W extends CANFrame> extends CANOpenConversationBase { + private final Logger logger = LoggerFactory.getLogger(SDOUploadConversation.class); private final SDOConversation<W> delegate; private final IndexAddress address; private final CANOpenDataType type; @@ -22,39 +27,56 @@ public class SDOUploadConversation<W extends CANFrame> extends CANOpenConversati this.type = type; } - public void execute(BiConsumer<PlcValue, Throwable> receiver) throws PlcException { + public void execute(CompletableFuture<PlcValue> receiver) throws PlcException { SDOInitiateUploadRequest rq = new SDOInitiateUploadRequest(address); delegate.send(rq, (tx, ctx) -> - ctx -// .onError((response, error) -> { -// System.err.println("Unexpected frame " + response + " " + error); -// receiver.accept(null, error); -// }) - .unwrap(CANOpenSDOResponse::getResponse) - .onError(((response, error) -> { - if (response instanceof SDOAbortResponse) { - SDOAbortResponse abort = (SDOAbortResponse) response; + ctx.onError((response, error) -> { + System.err.println("Unexpected frame " + response + " " + error); + if (error != null) { + receiver.completeExceptionally(error); + return; + } + if (response.getResponse() instanceof SDOAbortResponse) { + SDOAbortResponse abort = (SDOAbortResponse) response.getResponse(); SDOAbort sdoAbort = abort.getAbort(); - receiver.accept(null, new PlcException("Could not read value. Remote party reported code " + sdoAbort.getCode())); - } else { - receiver.accept(null, error); + receiver.completeExceptionally(new PlcException("Could not read value. Remote party reported code " + sdoAbort.getCode())); } - })) + }) + .check(reply -> { + logger.warn("Received answer {}", reply); + return true; + }) + .unwrap(CANOpenSDOResponse::getResponse).check(reply -> { + logger.warn("Received answer {}", reply); + return true; + }) + .check(reply -> { + logger.warn("Received answer {}", reply); + return true; + }) .only(SDOInitiateUploadResponse.class) - .check(r -> r.getAddress().equals(address)) + .check(resp -> { + logger.warn("Checking if reply address {}/{} matches {}/{}: {}", + Integer.toHexString(resp.getAddress().getIndex()), Integer.toHexString(resp.getAddress().getSubindex()), + Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), + resp.getAddress().equals(address) + ); + return resp.getAddress().equals(address); + }) .handle(response -> { - handle(receiver, response); + handle(tx, receiver, response); }) ); } - private void handle(BiConsumer<PlcValue, Throwable> receiver, SDOInitiateUploadResponse answer) { + private void handle(RequestTransactionManager.RequestTransaction tx, CompletableFuture<PlcValue> receiver, SDOInitiateUploadResponse answer) { BiConsumer<Integer, byte[]> valueCallback = (length, bytes) -> { try { - receiver.accept(decodeFrom(bytes, type, length), null); - } catch (ParseException e) { - receiver.accept(null, e); + final PlcValue decodedValue = decodeFrom(bytes, type, length); + receiver.complete(decodedValue); + } catch (ArrayIndexOutOfBoundsException | ParseException e) { + receiver.completeExceptionally(e); } }; @@ -62,32 +84,51 @@ public class SDOUploadConversation<W extends CANFrame> extends CANOpenConversati SDOInitiateExpeditedUploadResponse payload = (SDOInitiateExpeditedUploadResponse) answer.getPayload(); valueCallback.accept(payload.getData().length, payload.getData()); } else if (answer.getPayload() instanceof SDOInitiateSegmentedUploadResponse) { + logger.info("Beginning of segmented operation for address {}/{}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex())); ByteStorage.SDOUploadStorage storage = new ByteStorage.SDOUploadStorage(); storage.append(answer); SDOInitiateSegmentedUploadResponse segment = (SDOInitiateSegmentedUploadResponse) answer.getPayload(); fetch(storage, valueCallback, receiver, false, Long.valueOf(segment.getBytes()).intValue()); } else { - receiver.accept(null, new PlcException("Unsupported SDO operation kind.")); + receiver.completeExceptionally(new PlcException("Unsupported SDO operation kind.")); } } - private void fetch(ByteStorage.SDOUploadStorage storage, BiConsumer<Integer, byte[]> valueCallback, BiConsumer<PlcValue, Throwable> receiver, boolean toggle, int size) { + private void fetch(ByteStorage.SDOUploadStorage storage, BiConsumer<Integer, byte[]> valueCallback, CompletableFuture<PlcValue> receiver, boolean toggle, int size) { + logger.info("Request next data block for address {}/{}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex())); delegate.send(new SDOSegmentUploadRequest(toggle), (tx, ctx) -> { ctx.unwrap(CANOpenSDOResponse::getResponse) - .only(SDOSegmentUploadResponse.class) .onError((response, error) -> { System.out.println("Unexpected frame " + response + " " + error); - receiver.accept(null, error); + if (error != null) { + receiver.completeExceptionally(error); + return; + } + + if (response instanceof SDOAbortResponse) { + SDOAbortResponse abort = (SDOAbortResponse) response; + SDOAbort sdoAbort = abort.getAbort(); + receiver.completeExceptionally(new PlcException("Could not read value. Remote party reported code " + sdoAbort.getCode())); + } }) + .only(SDOSegmentUploadResponse.class) .check(r -> r.getToggle() == toggle) - .handle(reply -> { - storage.append(reply); + .handle(response -> { +// if (!reply.getToggle() == toggle) { // toggle flag is wrong, abort transaction +// logger.info("Received invalid answer from party for {}", address); +// delegate.send(new SDOAbortRequest(new SDOAbort(address, 0x100)), (tx2, ctx2) -> {}); +// return; +// } + + storage.append(response); - if (reply.getLast()) { + if (response.getLast()) { // validate size + logger.trace("Completed reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size); valueCallback.accept(Long.valueOf(size).intValue(), storage.get()); } else { + logger.trace("Continue reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size); fetch(storage, valueCallback, receiver, !toggle, size); } }); diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java index fcb7194..af4188a 100644 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java +++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java @@ -28,7 +28,7 @@ public class CANConfiguration implements Configuration, CANTransportConfiguratio private int nodeId; @ConfigurationParameter - private boolean hearbeat; + private boolean heartbeat; public int getNodeId() { return nodeId; @@ -39,11 +39,11 @@ public class CANConfiguration implements Configuration, CANTransportConfiguratio } public boolean isHeartbeat() { - return hearbeat; + return heartbeat; } public void setHeartbeat(boolean heartbeat) { - this.hearbeat = heartbeat; + this.heartbeat = heartbeat; } } diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenPDOField.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenPDOField.java index f65e5b3..2761260 100644 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenPDOField.java +++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/field/CANOpenPDOField.java @@ -20,17 +20,21 @@ package org.apache.plc4x.java.can.field; import org.apache.plc4x.java.api.exceptions.PlcInvalidFieldException; import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType; +import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService; +import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; public class CANOpenPDOField extends CANOpenField { - public static final Pattern ADDRESS_PATTERN = Pattern.compile("PDO:" + CANOpenField.NODE_PATTERN + ":(?<canDataType>\\w+)(\\[(?<numberOfElements>\\d)])?"); + public static final Pattern ADDRESS_PATTERN = Pattern.compile("(?<pdo>(?:RECEIVE|TRANSMIT)_PDO_[1-4]):" + CANOpenField.NODE_PATTERN + ":(?<canDataType>\\w+)(\\[(?<numberOfElements>\\d)])?"); + private final CANOpenService service; private final CANOpenDataType canOpenDataType; - public CANOpenPDOField(int node, CANOpenDataType canOpenDataType) { + public CANOpenPDOField(int node, CANOpenService service, CANOpenDataType canOpenDataType) { super(node); + this.service = service; this.canOpenDataType = canOpenDataType; } @@ -55,10 +59,19 @@ public class CANOpenPDOField extends CANOpenField { Matcher matcher = getMatcher(addressString); int nodeId = Integer.parseInt(matcher.group("nodeId")); + String pdo = matcher.group("pdo"); + CANOpenService service = CANOpenService.valueOf(pdo); + if (service == null) { + throw new IllegalArgumentException("Invalid PDO detected " + pdo); + } + String canDataTypeString = matcher.group("canDataType"); CANOpenDataType canOpenDataType = CANOpenDataType.valueOf(canDataTypeString); - return new CANOpenPDOField(nodeId, canOpenDataType); + return new CANOpenPDOField(nodeId, service, canOpenDataType); } + public CANOpenService getService() { + return service; + } } diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java index d96fe19..e384d0a 100644 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java +++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java @@ -18,6 +18,8 @@ under the License. */ package org.apache.plc4x.java.can.protocol; +import org.apache.commons.codec.binary.Hex; +import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; import org.apache.plc4x.java.api.messages.*; import org.apache.plc4x.java.api.model.PlcConsumerRegistration; import org.apache.plc4x.java.api.model.PlcField; @@ -38,7 +40,10 @@ import org.apache.plc4x.java.can.field.CANOpenField; import org.apache.plc4x.java.can.field.CANOpenPDOField; import org.apache.plc4x.java.can.field.CANOpenSDOField; import org.apache.plc4x.java.can.socketcan.SocketCANConversation; -import org.apache.plc4x.java.canopen.readwrite.*; +import org.apache.plc4x.java.canopen.readwrite.CANOpenHeartbeatPayload; +import org.apache.plc4x.java.canopen.readwrite.CANOpenPDOPayload; +import org.apache.plc4x.java.canopen.readwrite.CANOpenPayload; +import org.apache.plc4x.java.canopen.readwrite.IndexAddress; import org.apache.plc4x.java.canopen.readwrite.io.CANOpenHeartbeatPayloadIO; import org.apache.plc4x.java.canopen.readwrite.io.CANOpenPayloadIO; import org.apache.plc4x.java.canopen.readwrite.io.DataItemIO; @@ -61,6 +66,8 @@ import org.apache.plc4x.java.spi.transaction.RequestTransactionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.util.Collection; @@ -91,8 +98,6 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl @Override public void setConfiguration(CANConfiguration configuration) { this.configuration = configuration; - // Set the transaction manager to allow only one message at a time. - this.tm = new RequestTransactionManager(1); } @Override @@ -135,7 +140,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl @Override public void setContext(ConversationContext<SocketCANFrame> context) { super.setContext(context); - this.conversation = new SocketCANConversation(tm, context); + this.conversation = new SocketCANConversation(configuration.getNodeId(), context); } private SocketCANFrame createFrame(CANOpenHeartbeatPayload state) throws ParseException { @@ -171,7 +176,8 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl } private void writeInternally(InternalPlcWriteRequest writeRequest, CANOpenSDOField field, CompletableFuture<PlcWriteResponse> response) { - CANOpenConversation<CANFrame> canopen = new CANOpenConversation<>(field.getNodeId(), conversation); + final RequestTransactionManager.RequestTransaction transaction = tm.startRequest(); + CANOpenConversation<CANFrame> canopen = new CANOpenConversation<>(transaction, field.getNodeId(), conversation); PlcValue writeValue = writeRequest.getPlcValues().get(0); @@ -180,6 +186,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl download.execute((value, error) -> { String fieldName = writeRequest.getFieldNames().iterator().next(); response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, PlcResponseCode.OK))); + transaction.endRequest(); }); } catch (Exception e) { response.completeExceptionally(e); @@ -191,10 +198,10 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl try { String fieldName = writeRequest.getFieldNames().iterator().next(); - // - WriteBuffer buffer = DataItemIO.staticSerialize(writeValue, field.getCanOpenDataType(), writeValue.getLength() / 8, true); + WriteBuffer buffer = DataItemIO.staticSerialize(writeValue, field.getCanOpenDataType(), writeValue.getLength(), true); if (buffer != null) { - context.sendToWire(new SocketCANFrame(field.getNodeId(), buffer.getData())); + int cob = field.getService().getMin() + field.getNodeId(); + context.sendToWire(new SocketCANFrame(cob, buffer.getData())); response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, PlcResponseCode.OK))); } else { response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, PlcResponseCode.INVALID_DATA))); @@ -252,16 +259,27 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl } private void readInternally(InternalPlcReadRequest readRequest, CANOpenSDOField field, CompletableFuture<PlcReadResponse> response) { - CANOpenConversation<CANFrame> canopen = new CANOpenConversation<>(field.getNodeId(), conversation); - - SDOUploadConversation<CANFrame> upload = canopen.sdo().upload(new IndexAddress(field.getIndex(), field.getSubIndex()), field.getCanOpenDataType()); try { - upload.execute((value, error) -> { + final RequestTransactionManager.RequestTransaction transaction = tm.startRequest(); + CANOpenConversation<CANFrame> canopen = new CANOpenConversation<>(transaction, field.getNodeId(), conversation); + System.out.println("----> Submit read " + field.getIndex() + "/" + field.getSubIndex() + " from " + field.getNodeId() + " " + transaction); + SDOUploadConversation<CANFrame> upload = canopen.sdo().upload(new IndexAddress(field.getIndex(), field.getSubIndex()), field.getCanOpenDataType()); + CompletableFuture<PlcValue> callback = new CompletableFuture<>(); + callback.whenComplete((value, error) -> { + System.out.println("<---- Received reply " + field.getIndex() + "/" + field.getSubIndex() + " from " + field.getNodeId() + " " + value + "/" + error + " " + transaction); + if (error != null) { + response.completeExceptionally(error); + transaction.endRequest(); + return; + } + String fieldName = readRequest.getFieldNames().iterator().next(); Map<String, ResponseItem<PlcValue>> fields = new HashMap<>(); fields.put(fieldName, new ResponseItem<>(PlcResponseCode.OK, value)); response.complete(new DefaultPlcReadResponse(readRequest, fields)); + transaction.endRequest(); }); + upload.execute(callback); } catch (Exception e) { response.completeExceptionally(e); } @@ -275,13 +293,23 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl CANOpenDriverContext.CALLBACK.receive(msg); if (service != null) { - logger.info("Decoded CANOpen {} from {}, message {}", service, Math.abs(service.getMin() - msg.getIdentifier()), payload); + int nodeId = Math.abs(service.getMin() - msg.getIdentifier()); if (service.getPdo() && payload instanceof CANOpenPDOPayload) { - logger.info("Broadcasting PDO to subscribers"); - publishEvent(msg.getIdentifier(), (CANOpenPDOPayload) payload); + publishEvent(service, nodeId, (CANOpenPDOPayload) payload); + } else { + String hex = ""; + if (logger.isInfoEnabled()) { + try { + final WriteBuffer buffer = new WriteBuffer(payload.getLengthInBytes(), true); + CANOpenPayloadIO.staticSerialize(buffer, payload); + hex = Hex.encodeHexString(buffer.getData()); + } catch (ParseException e) { + e.printStackTrace(); + } + } + logger.info("Decoded CANOpen {} from {}, message {}, {}", service, nodeId, payload, hex); } - } else { logger.info("CAN message {}, {}", msg.getIdentifier(), msg); } @@ -296,7 +324,8 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl // } } - private void publishEvent(int nodeId, CANOpenPDOPayload payload) { + private void publishEvent(CANOpenService service, int nodeId, CANOpenPDOPayload payload) { + CANOpenSubscriptionHandle dispatchedHandle = null; for (Map.Entry<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> entry : consumers.entrySet()) { DefaultPlcConsumerRegistration registration = entry.getKey(); Consumer<PlcSubscriptionEvent> consumer = entry.getValue(); @@ -305,7 +334,10 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl if (handler instanceof CANOpenSubscriptionHandle) { CANOpenSubscriptionHandle handle = (CANOpenSubscriptionHandle) handler; - if (handle.matches(nodeId)) { + if (handle.matches(service, nodeId)) { + logger.trace("Dispatching notification {} for node {} to {}", service, nodeId, handle); + dispatchedHandle = handle; + CANOpenPDOField field = handle.getField(); byte[] data = payload.getPdo().getData(); try { @@ -329,10 +361,15 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl ); consumer.accept(event); } + } else { } } } } + + if (dispatchedHandle == null) { + logger.trace("Could not find subscription matching {} and node {}", service, nodeId); + } } @Override @@ -362,8 +399,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<SocketCANFrame> impl private int cobId(int nodeId, CANOpenService service) { // form 32 bit socketcan identifier - return (nodeId << 24) & 0xff000000 | - (service.getValue() << 16 ) & 0x00ff0000; + return service.getMin() + nodeId; } private CANOpenService serviceId(int cobId) { diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenSubscriptionHandle.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenSubscriptionHandle.java index 07ecb3b..9bfd688 100644 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenSubscriptionHandle.java +++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenSubscriptionHandle.java @@ -1,6 +1,7 @@ package org.apache.plc4x.java.can.protocol; import org.apache.plc4x.java.can.field.CANOpenPDOField; +import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService; import org.apache.plc4x.java.spi.messages.PlcSubscriber; import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle; @@ -14,8 +15,11 @@ public class CANOpenSubscriptionHandle extends DefaultPlcSubscriptionHandle { this.field = field; } - public boolean matches(int identifier) { - return field.getNodeId() == 0 || field.getNodeId() == identifier; + public boolean matches(CANOpenService service, int identifier) { + if (field.getService() != service) { + return false; + } + return field.getNodeId() == identifier; } public String getName() { @@ -25,4 +29,13 @@ public class CANOpenSubscriptionHandle extends DefaultPlcSubscriptionHandle { public CANOpenPDOField getField() { return field; } + + public String toString() { + return "CANOpenSubscriptionHandle [service=" + field.getService() + ", node=" + intAndHex(field.getNodeId()) + ", cob=" + intAndHex(field.getService().getMin() + field.getNodeId()) + "]"; + } + + private static String intAndHex(int val) { + return val + "(0x" + Integer.toHexString(val) + ")"; + } + } diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java index 8b01c2a..c35843d 100644 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java +++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java @@ -7,7 +7,6 @@ import org.apache.plc4x.java.can.api.conversation.canopen.CANFrameBuilder; import org.apache.plc4x.java.socketcan.readwrite.SocketCANFrame; import org.apache.plc4x.java.spi.ConversationContext; import org.apache.plc4x.java.spi.ConversationContext.SendRequestContext; -import org.apache.plc4x.java.spi.transaction.RequestTransactionManager; import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTransaction; import java.time.Duration; @@ -15,32 +14,35 @@ import java.util.function.BiConsumer; public class SocketCANConversation implements CANConversation<CANFrame> { - private final RequestTransactionManager tm; + private final int nodeId; private final ConversationContext<SocketCANFrame> context; - public SocketCANConversation(RequestTransactionManager tm, ConversationContext<SocketCANFrame> context) { - this.tm = tm; + public SocketCANConversation(int nodeId, ConversationContext<SocketCANFrame> context) { + this.nodeId = nodeId; this.context = context; } @Override + public int getNodeId() { + return nodeId; + } + + @Override public CANFrameBuilder<CANFrame> frameBuilder() { return new SocketCANFrameBuilder(); } @Override - public void send(CANFrame frame, BiConsumer<RequestTransaction, SendRequestContext<CANFrame>> callback) { + public void send(RequestTransaction transaction, CANFrame frame, BiConsumer<RequestTransaction, SendRequestContext<CANFrame>> callback) { if (frame instanceof SocketCANDelegateFrame) { - RequestTransactionManager.RequestTransaction transaction = tm.startRequest(); - - ConversationContext.SendRequestContext<CANFrame> ctx = context.sendRequest(((SocketCANDelegateFrame) frame).getFrame()) - .expectResponse(SocketCANFrame.class, Duration.ofSeconds(10L)) -// .onError((response, error) -> { -// System.err.println("Unexpected frame " + response + " " + error); -// }) - .unwrap(SocketCANDelegateFrame::new); - //return CompletableFuture.completedFuture(new SocketCANTransactionContext<>(transaction, ctx)); - callback.accept(transaction, ctx); + System.out.println("-----> Sending request frame " + transaction); + transaction.submit(() -> { + ConversationContext.SendRequestContext<CANFrame> ctx = context.sendRequest(((SocketCANDelegateFrame) frame).getFrame()) + .expectResponse(SocketCANFrame.class, Duration.ofSeconds(10L)) + .unwrap(SocketCANDelegateFrame::new); + System.out.println("-----> Frame been sent " + transaction); + callback.accept(transaction, ctx); + }); return; } throw new PlcRuntimeException("Unsupported frame type " + frame);
