This is an automated email from the ASF dual-hosted git repository. ldywicki pushed a commit to branch feature/socketcan-0.8-preparations in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 1ea4a1f626391f3a865b24b035d361b1640465d9 Author: Ćukasz Dywicki <[email protected]> AuthorDate: Fri Nov 6 11:26:23 2020 +0100 Update (simplify) conversation logic, introduce timeouts and fix string serialization. --- .../socketcan/netty/SocketCANChannel.java | 19 ++++- .../src/main/resources/protocols/can/canopen.mspec | 4 +- .../api/conversation/canopen/CANConversation.java | 2 +- .../conversation/canopen/CANOpenConversation.java | 41 ---------- .../canopen/CANOpenConversationBase.java | 23 ++++++ .../api/conversation/canopen/SDOConversation.java | 44 ----------- .../canopen/SDODownloadConversation.java | 92 +++++++++++++--------- .../canopen/SDOUploadConversation.java | 75 ++++++++++-------- .../java/can/configuration/CANConfiguration.java | 13 +++ .../java/can/protocol/CANOpenProtocolLogic.java | 9 +-- .../java/can/socketcan/SocketCANConversation.java | 12 +-- 11 files changed, 159 insertions(+), 175 deletions(-) diff --git a/plc4j/transports/socketcan/src/main/java/org/apache/plc4x/java/transport/socketcan/netty/SocketCANChannel.java b/plc4j/transports/socketcan/src/main/java/org/apache/plc4x/java/transport/socketcan/netty/SocketCANChannel.java index f141e9e..afa4855 100644 --- a/plc4j/transports/socketcan/src/main/java/org/apache/plc4x/java/transport/socketcan/netty/SocketCANChannel.java +++ b/plc4j/transports/socketcan/src/main/java/org/apache/plc4x/java/transport/socketcan/netty/SocketCANChannel.java @@ -26,6 +26,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; import io.netty.channel.oio.OioByteStreamChannel; import org.apache.plc4x.java.transport.socketcan.netty.address.SocketCANAddress; import org.slf4j.Logger; @@ -115,16 +116,19 @@ public class SocketCANChannel extends OioByteStreamChannel { while (!isInputShutdown()) { ByteBuffer byteBuffer = ByteBuffer.allocateDirect(16); handle.readUnsafe(byteBuffer); + buffer.writeBytes(byteBuffer); +// CanFrame frame = handle.read(); +// System.out.println("Read frame " + frame); // frameBytes.writeBytes(frame.getBuffer()); // String dump = ByteBufUtil.prettyHexDump(frameBytes); // System.out.println(frame + "\n" + dump); - buffer.writeBytes(byteBuffer); +// buffer.writeBytes(frame.getBuffer()); } } catch (IOException e) { logger.warn("Could not read data", e); pipeline().fireExceptionCaught(e); } - }); + }, "javacan-reader"); loopThread.start(); activate(new CANInputStream(buffer), new CANOutputStream(handle)); @@ -170,6 +174,11 @@ public class SocketCANChannel extends OioByteStreamChannel { } @Override + protected boolean isCompatible(EventLoop loop) { + return super.isCompatible(loop); + } + + @Override public ChannelConfig config() { return this.config; } @@ -222,9 +231,15 @@ public class SocketCANChannel extends OioByteStreamChannel { if (buf.readableBytes() > 0) { return buf.readByte() & 0xFF; } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new IOException(e); + } } throw new SocketTimeoutException(); } + } diff --git a/protocols/can/src/main/resources/protocols/can/canopen.mspec b/protocols/can/src/main/resources/protocols/can/canopen.mspec index 7886b88..1cb60fd 100644 --- a/protocols/can/src/main/resources/protocols/can/canopen.mspec +++ b/protocols/can/src/main/resources/protocols/can/canopen.mspec @@ -335,13 +335,13 @@ ['CANOpenDataType.OCTET_STRING' String [manual string 'UTF-8' 'value' 'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.parseString", io, size, _type.encoding)' - 'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.serializeString", io, _value, _type.encoding)' '_value.length' + 'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.serializeString", io, _value, _type.encoding)' '_value.length * 8' ] ] ['CANOpenDataType.VISIBLE_STRING' String [manual string 'UTF-8' 'value' 'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.parseString", io, size, _type.encoding)' - 'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.serializeString", io, _value, _type.encoding)' '_value.length' + 'STATIC_CALL("org.apache.plc4x.java.can.helper.CANOpenHelper.serializeString", io, _value, _type.encoding)' '_value.length * 8' ] ] //CANOpenDataType.TIME_OF_DAY' CANOpenTime diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java index 471e8a6..67dd41b 100644 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java +++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANConversation.java @@ -12,7 +12,7 @@ public interface CANConversation<W extends CANOpenFrame> { CANOpenFrameBuilder createBuilder(); - void send(W frame, Consumer<SendRequestContext<W>> callback); + SendRequestContext<W> send(W frame); } diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java deleted file mode 100644 index 9785fee..0000000 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversation.java +++ /dev/null @@ -1,41 +0,0 @@ -package org.apache.plc4x.java.can.api.conversation.canopen; - -import org.apache.plc4x.java.can.canopen.CANOpenFrame; -import org.apache.plc4x.java.canopen.readwrite.CANOpenPayload; -import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService; -import org.apache.plc4x.java.spi.ConversationContext.SendRequestContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.function.Consumer; - -public class CANOpenConversation { - - private final Logger logger = LoggerFactory.getLogger(CANOpenConversation.class); - private final int node; - private final CANConversation<CANOpenFrame> delegate; - - public CANOpenConversation(int node, CANConversation<CANOpenFrame> delegate) { - this.node = node; - this.delegate = delegate; - } - - public SDOConversation sdo() { - return new SDOConversation(this); - } - - public void send(CANOpenService service, CANOpenPayload payload, Consumer<SendRequestContext<CANOpenPayload>> callback) { - CANOpenFrame frame = delegate.createBuilder().withNodeId(node).withService(service).withPayload(payload).build(); - delegate.send(frame, (ctx) -> { - SendRequestContext<CANOpenPayload> unwrap = ctx -// .onError((response, error) -> { -// System.err.println("Unexpected frame " + response + " " + error); -// }) - .unwrap(CANOpenFrame::getPayload); - callback.accept(unwrap); - }); - - - } - -} diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java index 0c65988..28a6941 100644 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java +++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/CANOpenConversationBase.java @@ -2,15 +2,38 @@ package org.apache.plc4x.java.can.api.conversation.canopen; import org.apache.plc4x.java.api.value.PlcValue; import org.apache.plc4x.java.can.canopen.CANOpenFrame; +import org.apache.plc4x.java.canopen.readwrite.CANOpenSDORequest; +import org.apache.plc4x.java.canopen.readwrite.SDORequest; import org.apache.plc4x.java.canopen.readwrite.io.DataItemIO; import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType; +import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService; import org.apache.plc4x.java.spi.generation.ParseException; import org.apache.plc4x.java.spi.generation.ReadBuffer; public abstract class CANOpenConversationBase { + protected final CANConversation<CANOpenFrame> delegate; + protected final int nodeId; + + public CANOpenConversationBase(CANConversation<CANOpenFrame> delegate, int nodeId) { + this.delegate = delegate; + this.nodeId = nodeId; + } + protected PlcValue decodeFrom(byte[] data, CANOpenDataType type, int length) throws ParseException { return DataItemIO.staticParse(new ReadBuffer(data, true), type, length); } + protected boolean isTransmitSDOFromReceiver(CANOpenFrame frame) { + return frame.getNodeId() == nodeId && frame.getService() == CANOpenService.TRANSMIT_SDO; + } + + protected CANOpenFrame createFrame(SDORequest rq) { + return delegate.createBuilder() + .withNodeId(nodeId) + .withService(CANOpenService.RECEIVE_SDO) + .withPayload(new CANOpenSDORequest(rq.getCommand(), rq)) + .build(); + } + } diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOConversation.java deleted file mode 100644 index 251e4da..0000000 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOConversation.java +++ /dev/null @@ -1,44 +0,0 @@ -package org.apache.plc4x.java.can.api.conversation.canopen; - -import org.apache.plc4x.java.api.value.PlcValue; -import org.apache.plc4x.java.can.canopen.CANOpenFrame; -import org.apache.plc4x.java.canopen.readwrite.CANOpenSDORequest; -import org.apache.plc4x.java.canopen.readwrite.CANOpenSDOResponse; -import org.apache.plc4x.java.canopen.readwrite.IndexAddress; -import org.apache.plc4x.java.canopen.readwrite.SDORequest; -import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType; -import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService; -import org.apache.plc4x.java.spi.ConversationContext.SendRequestContext; -import org.apache.plc4x.java.spi.transaction.RequestTransactionManager.RequestTransaction; - -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -public class SDOConversation { - - private final CANOpenConversation delegate; - - public SDOConversation(CANOpenConversation delegate) { - this.delegate = delegate; - } - - public SDODownloadConversation download(IndexAddress indexAddress, PlcValue value, CANOpenDataType type) { - return new SDODownloadConversation(this, indexAddress, value, type); - } - - public SDOUploadConversation upload(IndexAddress indexAddress, CANOpenDataType type) { - return new SDOUploadConversation(this, indexAddress, type); - } - - public void send(SDORequest request, Consumer<SendRequestContext<CANOpenSDOResponse>> callback) { - delegate.send(CANOpenService.RECEIVE_SDO, new CANOpenSDORequest(request.getCommand(), request), (ctx) -> { - SendRequestContext<CANOpenSDOResponse> context = ctx -// .onError((response, error) -> { -// System.out.println("Unexpected frame " + response + " " + error); -// }) - .only(CANOpenSDOResponse.class); - callback.accept(context); - }); - } - -} diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java index 8e90bb6..ba0bffb 100644 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java +++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDODownloadConversation.java @@ -4,22 +4,24 @@ import org.apache.plc4x.java.api.exceptions.PlcException; import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; import org.apache.plc4x.java.api.types.PlcResponseCode; import org.apache.plc4x.java.api.value.PlcValue; +import org.apache.plc4x.java.can.canopen.CANOpenFrame; import org.apache.plc4x.java.canopen.readwrite.*; import org.apache.plc4x.java.canopen.readwrite.io.DataItemIO; import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType; +import org.apache.plc4x.java.canopen.readwrite.types.CANOpenService; import org.apache.plc4x.java.canopen.readwrite.types.SDOResponseCommand; import org.apache.plc4x.java.spi.generation.ParseException; import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; public class SDODownloadConversation extends CANOpenConversationBase { - private final SDOConversation delegate; + private final CANConversation<CANOpenFrame> delegate; private final IndexAddress indexAddress; private final byte[] data; - public SDODownloadConversation(SDOConversation delegate, IndexAddress indexAddress, PlcValue value, CANOpenDataType type) { + public SDODownloadConversation(CANConversation<CANOpenFrame> delegate, int nodeId, IndexAddress indexAddress, PlcValue value, CANOpenDataType type) { + super(delegate, nodeId); this.delegate = delegate; this.indexAddress = indexAddress; @@ -33,17 +35,20 @@ public class SDODownloadConversation extends CANOpenConversationBase { public void execute(CompletableFuture<PlcResponseCode> receiver) { if (data.length > 4) { // segmented - SDOInitiateSegmentedUploadResponse size = new SDOInitiateSegmentedUploadResponse(data.length); - delegate.send(new SDOInitiateDownloadRequest(false, true, indexAddress, size), (ctx) -> { - ctx.unwrap(CANOpenSDOResponse::getResponse) - .check(p -> p.getCommand() == SDOResponseCommand.INITIATE_DOWNLOAD) - .only(SDOInitiateDownloadResponse.class) - .check(p -> indexAddress.equals(p.getAddress())) - .handle(x -> { - put(data, receiver, false, 0); - }); - }); + delegate.send(createFrame(new SDOInitiateDownloadRequest(false, true, indexAddress, size))) + .check(this::isTransmitSDOFromReceiver) + .onTimeout(receiver::completeExceptionally) + .onError((response, error) -> receiver.completeExceptionally(error)) + .unwrap(CANOpenFrame::getPayload) + .only(CANOpenSDOResponse.class) + .unwrap(CANOpenSDOResponse::getResponse) + .check(p -> p.getCommand() == SDOResponseCommand.INITIATE_DOWNLOAD) + .only(SDOInitiateDownloadResponse.class) + .check(p -> indexAddress.equals(p.getAddress())) + .handle(x -> { + put(data, receiver, false, 0); + }); return; } @@ -55,17 +60,24 @@ public class SDODownloadConversation extends CANOpenConversationBase { new SDOInitiateExpeditedUploadResponse(data) ); - delegate.send(rq, (ctx) -> - ctx.onError((response, error) -> { - System.out.println("Unexpected frame " + response + " " + error); + delegate.send(createFrame(rq)) + .check(this::isTransmitSDOFromReceiver) + .onTimeout(receiver::completeExceptionally) + .onError((response, error) -> { + if (error != null) { + receiver.completeExceptionally(error); + } else { + receiver.completeExceptionally(new PlcException("Transaction terminated")); + } }) + .unwrap(CANOpenFrame::getPayload) + .only(CANOpenSDOResponse.class) .unwrap(CANOpenSDOResponse::getResponse) .only(SDOInitiateDownloadResponse.class) .check(r -> r.getCommand() == SDOResponseCommand.INITIATE_DOWNLOAD) .handle(r -> { - System.out.println(r); - }) - ); + receiver.complete(PlcResponseCode.OK); + }); } private void put(byte[] data, CompletableFuture<PlcResponseCode> receiver, boolean toggle, int offset) { @@ -73,24 +85,28 @@ public class SDODownloadConversation extends CANOpenConversationBase { byte[] segment = new byte[Math.min(remaining, 7)]; System.arraycopy(data, offset, segment, 0, segment.length); - delegate.send(new SDOSegmentDownloadRequest(toggle, remaining <= 7, segment), (ctx) -> { - ctx.unwrap(CANOpenSDOResponse::getResponse) - .only(SDOSegmentDownloadResponse.class) - .onError((response, error) -> { - if (error != null) { - receiver.completeExceptionally(error); - } else { - receiver.completeExceptionally(new PlcException("Transaction terminated")); - } - }) - .check(response -> response.getToggle() == toggle) - .handle(reply -> { - if (offset + segment.length == data.length) { - receiver.complete(PlcResponseCode.OK); - } else { - put(data, receiver, !toggle, offset + segment.length); - } - }); - }); + delegate.send(createFrame(new SDOSegmentDownloadRequest(toggle, remaining <= 7, segment))) + .check(this::isTransmitSDOFromReceiver) + .onTimeout(receiver::completeExceptionally) + .unwrap(CANOpenFrame::getPayload) + .only(CANOpenSDOResponse.class) + .unwrap(CANOpenSDOResponse::getResponse) + .only(SDOSegmentDownloadResponse.class) + .onError((response, error) -> { + if (error != null) { + receiver.completeExceptionally(error); + } else { + receiver.completeExceptionally(new PlcException("Transaction terminated")); + } + }) + .check(response -> response.getToggle() == toggle) + .handle(reply -> { + if (offset + segment.length == data.length) { + receiver.complete(PlcResponseCode.OK); + } else { + put(data, receiver, !toggle, offset + segment.length); + } + }); } + } diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java index 0beb17c..6be036a 100644 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java +++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/api/conversation/canopen/SDOUploadConversation.java @@ -3,6 +3,7 @@ package org.apache.plc4x.java.can.api.conversation.canopen; import org.apache.plc4x.java.api.exceptions.PlcException; import org.apache.plc4x.java.api.value.PlcValue; import org.apache.plc4x.java.can.api.segmentation.accumulator.ByteStorage; +import org.apache.plc4x.java.can.canopen.CANOpenFrame; import org.apache.plc4x.java.canopen.readwrite.*; import org.apache.plc4x.java.canopen.readwrite.types.CANOpenDataType; import org.apache.plc4x.java.spi.generation.ParseException; @@ -15,12 +16,11 @@ import java.util.function.BiConsumer; public class SDOUploadConversation extends CANOpenConversationBase { private final Logger logger = LoggerFactory.getLogger(SDOUploadConversation.class); - private final SDOConversation delegate; private final IndexAddress address; private final CANOpenDataType type; - public SDOUploadConversation(SDOConversation delegate, IndexAddress address, CANOpenDataType type) { - this.delegate = delegate; + public SDOUploadConversation(CANConversation<CANOpenFrame> delegate, int nodeId, IndexAddress address, CANOpenDataType type) { + super(delegate, nodeId); this.address = address; this.type = type; } @@ -28,8 +28,12 @@ public class SDOUploadConversation extends CANOpenConversationBase { public void execute(CompletableFuture<PlcValue> receiver) { SDOInitiateUploadRequest rq = new SDOInitiateUploadRequest(address); - delegate.send(rq, (ctx) -> - ctx.onError((response, error) -> { + delegate.send(createFrame(rq)) + .check(this::isTransmitSDOFromReceiver) + .onTimeout(receiver::completeExceptionally) + .unwrap(CANOpenFrame::getPayload) + .only(CANOpenSDOResponse.class) + .onError((response, error) -> { if (error != null) { receiver.completeExceptionally(error); return; @@ -45,8 +49,7 @@ public class SDOUploadConversation extends CANOpenConversationBase { .check(response -> response.getAddress().equals(address)) .handle(response -> { handle(receiver, response); - }) - ); + }); } private void handle(CompletableFuture<PlcValue> receiver, SDOInitiateUploadResponse answer) { @@ -76,36 +79,38 @@ public class SDOUploadConversation extends CANOpenConversationBase { private void fetch(ByteStorage.SDOUploadStorage storage, BiConsumer<Integer, byte[]> valueCallback, CompletableFuture<PlcValue> receiver, boolean toggle, int size) { logger.info("Request next data block for address {}/{}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex())); - delegate.send(new SDOSegmentUploadRequest(toggle), (ctx) -> { - ctx.unwrap(CANOpenSDOResponse::getResponse) - .onError((response, error) -> { - if (error != null) { - receiver.completeExceptionally(error); - return; - } + delegate.send(createFrame(new SDOSegmentUploadRequest(toggle))) + .check(this::isTransmitSDOFromReceiver) + .onTimeout(receiver::completeExceptionally) + .unwrap(CANOpenFrame::getPayload) + .only(CANOpenSDOResponse.class) + .unwrap(CANOpenSDOResponse::getResponse) + .onError((response, error) -> { + if (error != null) { + receiver.completeExceptionally(error); + return; + } - if (response instanceof SDOAbortResponse) { - SDOAbortResponse abort = (SDOAbortResponse) response; - SDOAbort sdoAbort = abort.getAbort(); - receiver.completeExceptionally(new PlcException("Could not read value. Remote party reported code " + sdoAbort.getCode())); - } - }) - .only(SDOSegmentUploadResponse.class) - .check(r -> r.getToggle() == toggle) - .handle(response -> { - storage.append(response); + if (response instanceof SDOAbortResponse) { + SDOAbortResponse abort = (SDOAbortResponse) response; + SDOAbort sdoAbort = abort.getAbort(); + receiver.completeExceptionally(new PlcException("Could not read value. Remote party reported code " + sdoAbort.getCode())); + } + }) + .only(SDOSegmentUploadResponse.class) + .check(r -> r.getToggle() == toggle) + .handle(response -> { + storage.append(response); - if (response.getLast()) { - // validate size - logger.trace("Completed reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size); - valueCallback.accept(Long.valueOf(size).intValue(), storage.get()); - } else { - logger.trace("Continue reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size); - fetch(storage, valueCallback, receiver, !toggle, size); - } - }); - }); + if (response.getLast()) { + // validate size + logger.trace("Completed reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size); + valueCallback.accept(Long.valueOf(size).intValue(), storage.get()); + } else { + logger.trace("Continue reading of data from {}/{}, collected {}, wanted {}", Integer.toHexString(address.getIndex()), Integer.toHexString(address.getSubindex()), storage.size(), size); + fetch(storage, valueCallback, receiver, !toggle, size); + } + }); } - } diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java index af4188a..6f18906 100644 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java +++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/configuration/CANConfiguration.java @@ -20,6 +20,7 @@ package org.apache.plc4x.java.can.configuration; import org.apache.plc4x.java.spi.configuration.Configuration; import org.apache.plc4x.java.spi.configuration.annotations.ConfigurationParameter; +import org.apache.plc4x.java.spi.configuration.annotations.defaults.IntDefaultValue; import org.apache.plc4x.java.transport.socketcan.CANTransportConfiguration; public class CANConfiguration implements Configuration, CANTransportConfiguration { @@ -30,6 +31,10 @@ public class CANConfiguration implements Configuration, CANTransportConfiguratio @ConfigurationParameter private boolean heartbeat; + @ConfigurationParameter("request-timeout") + @IntDefaultValue(1000) + private int requestTimeout; + public int getNodeId() { return nodeId; } @@ -46,4 +51,12 @@ public class CANConfiguration implements Configuration, CANTransportConfiguratio this.heartbeat = heartbeat; } + public int getRequestTimeout() { + return requestTimeout; + } + + public void setRequestTimeout(int requestTimeout) { + this.requestTimeout = requestTimeout; + } + } diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java index a46b17d..f751b7c 100644 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java +++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/protocol/CANOpenProtocolLogic.java @@ -31,7 +31,6 @@ import org.apache.plc4x.java.api.value.PlcUSINT; import org.apache.plc4x.java.api.value.PlcValue; import org.apache.plc4x.java.can.canopen.CANOpenFrame; import org.apache.plc4x.java.can.api.conversation.canopen.CANConversation; -import org.apache.plc4x.java.can.api.conversation.canopen.CANOpenConversation; import org.apache.plc4x.java.can.api.conversation.canopen.SDODownloadConversation; import org.apache.plc4x.java.can.api.conversation.canopen.SDOUploadConversation; import org.apache.plc4x.java.can.canopen.CANOpenFrameBuilderFactory; @@ -136,7 +135,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<CANOpenFrame> implem @Override public void setContext(ConversationContext<CANOpenFrame> context) { super.setContext(context); - this.conversation = new SocketCANConversation(configuration.getNodeId(), context, factory); + this.conversation = new SocketCANConversation(configuration.getNodeId(), context, configuration.getRequestTimeout(), factory); } private CANOpenFrame createFrame(CANOpenHeartbeatPayload state) throws ParseException { @@ -186,8 +185,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<CANOpenFrame> implem }); PlcValue writeValue = writeRequest.getPlcValues().get(0); - CANOpenConversation canopen = new CANOpenConversation(field.getNodeId(), conversation); - SDODownloadConversation download = canopen.sdo().download(new IndexAddress(field.getIndex(), field.getSubIndex()), writeValue, field.getCanOpenDataType()); + SDODownloadConversation download = new SDODownloadConversation(conversation, field.getNodeId(), new IndexAddress(field.getIndex(), field.getSubIndex()), writeValue, field.getCanOpenDataType()); transaction.submit(() -> download.execute(callback)); } @@ -285,8 +283,7 @@ public class CANOpenProtocolLogic extends Plc4xProtocolBase<CANOpenFrame> implem transaction.endRequest(); }); - CANOpenConversation canopen = new CANOpenConversation(field.getNodeId(), conversation); - SDOUploadConversation upload = canopen.sdo().upload(new IndexAddress(field.getIndex(), field.getSubIndex()), field.getCanOpenDataType()); + SDOUploadConversation upload = new SDOUploadConversation(conversation, field.getNodeId(), new IndexAddress(field.getIndex(), field.getSubIndex()), field.getCanOpenDataType()); transaction.submit(() -> upload.execute(callback)); } diff --git a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java index 721b0f1..305ed68 100644 --- a/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java +++ b/sandbox/test-java-can-driver/src/main/java/org/apache/plc4x/java/can/socketcan/SocketCANConversation.java @@ -14,11 +14,13 @@ public class SocketCANConversation implements CANConversation<CANOpenFrame> { private final int nodeId; private final ConversationContext<CANOpenFrame> context; + private final int timeout; private final CANOpenFrameBuilderFactory factory; - public SocketCANConversation(int nodeId, ConversationContext<CANOpenFrame> context, CANOpenFrameBuilderFactory factory) { + public SocketCANConversation(int nodeId, ConversationContext<CANOpenFrame> context, int timeout, CANOpenFrameBuilderFactory factory) { this.nodeId = nodeId; this.context = context; + this.timeout = timeout; this.factory = factory; } @@ -32,11 +34,9 @@ public class SocketCANConversation implements CANConversation<CANOpenFrame> { return factory.createBuilder(); } - @Override - public void send(CANOpenFrame frame, Consumer<SendRequestContext<CANOpenFrame>> callback) { - SendRequestContext<CANOpenFrame> ctx = context.sendRequest(frame) - .expectResponse(CANOpenFrame.class, Duration.ofSeconds(10L)); - callback.accept(ctx); + public SendRequestContext<CANOpenFrame> send(CANOpenFrame frame) { + return context.sendRequest(frame) + .expectResponse(CANOpenFrame.class, Duration.ofMillis(timeout)); } }
