This is an automated email from the ASF dual-hosted git repository. cdutz pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 2eb67c005e47ea6e4187da61fd1d87ec4e5ebd27 Author: Christofer Dutz <[email protected]> AuthorDate: Tue Sep 24 10:00:14 2019 +0200 - Improved and fine-tuned the GeneratedDriverByteToMessageCodec to allow checking if enough data is available and to remove the rest of faulty packages after failing to decode. - Added a protocolHandler property to the RawSocketChannelFactory - Added filter capabilities to the PcapSocket stuff - Added a speed-factor to slow down or speed up the pcap replay --- plc4j/protocols/driver-bases/base/pom.xml | 4 +++ .../base/GeneratedDriverByteToMessageCodec.java | 19 ++++++++-- .../base/connection/RawSocketChannelFactory.java | 11 +++--- .../utils/pcapsockets/netty/PcapSocketAddress.java | 27 ++++++++++++-- .../utils/pcapsockets/netty/PcapSocketChannel.java | 29 +++++++++++++-- .../pcapsockets/netty/PcapSocketChannelConfig.java | 20 ++++++++++- .../utils/rawsockets/netty/RawSocketAddress.java | 1 - ...wSocketAddress.java => UdpIpPacketHandler.java} | 41 ++++++++++------------ 8 files changed, 115 insertions(+), 37 deletions(-) diff --git a/plc4j/protocols/driver-bases/base/pom.xml b/plc4j/protocols/driver-bases/base/pom.xml index e27b060..ae89e08 100644 --- a/plc4j/protocols/driver-bases/base/pom.xml +++ b/plc4j/protocols/driver-bases/base/pom.xml @@ -64,6 +64,10 @@ <groupId>io.netty</groupId> <artifactId>netty-transport</artifactId> </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> <dependency> <groupId>ch.qos.logback</groupId> diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/GeneratedDriverByteToMessageCodec.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/GeneratedDriverByteToMessageCodec.java index 4ac7fb6..5818622 100644 --- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/GeneratedDriverByteToMessageCodec.java +++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/GeneratedDriverByteToMessageCodec.java @@ -20,6 +20,7 @@ package org.apache.plc4x.java.base; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; +import org.apache.commons.codec.binary.Hex; import org.apache.plc4x.java.utils.Message; import org.apache.plc4x.java.utils.MessageIO; import org.apache.plc4x.java.utils.ReadBuffer; @@ -48,17 +49,31 @@ public abstract class GeneratedDriverByteToMessageCodec<T extends Message> exten @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception { - byte[] bytes = new byte[byteBuf.readableBytes()]; + // Check if enough data is present to process the entire package. + int packetSize = getPacketSize(byteBuf); + if(packetSize == -1 || packetSize > byteBuf.readableBytes()) { + return; + } + + byte[] bytes = new byte[packetSize]; byteBuf.readBytes(bytes); + ReadBuffer readBuffer = new ReadBuffer(bytes); while (readBuffer.getPos() < bytes.length) { try { T packet = io.parse(readBuffer); out.add(packet); } catch (Exception e) { - logger.warn("Error decoding package: " + e.getMessage()); + logger.warn("Error decoding package with content [" + Hex.encodeHexString(bytes) + "]: " + + e.getMessage(), e); + // Just remove any trailing junk ... if there is any. + removeRestOfCorruptPackage(byteBuf); } } } + abstract protected int getPacketSize(ByteBuf byteBuf); + + abstract protected void removeRestOfCorruptPackage(ByteBuf byteBuf); + } diff --git a/plc4j/protocols/driver-bases/raw-socket/src/main/java/org/apache/plc4x/java/base/connection/RawSocketChannelFactory.java b/plc4j/protocols/driver-bases/raw-socket/src/main/java/org/apache/plc4x/java/base/connection/RawSocketChannelFactory.java index 80684e6..5ba44c6 100644 --- a/plc4j/protocols/driver-bases/raw-socket/src/main/java/org/apache/plc4x/java/base/connection/RawSocketChannelFactory.java +++ b/plc4j/protocols/driver-bases/raw-socket/src/main/java/org/apache/plc4x/java/base/connection/RawSocketChannelFactory.java @@ -25,10 +25,7 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import org.apache.plc4x.java.api.exceptions.PlcConnectionException; import org.apache.plc4x.java.api.exceptions.PlcException; -import org.apache.plc4x.java.utils.rawsockets.netty.RawSocketChannel; -import org.apache.plc4x.java.utils.rawsockets.netty.RawSocketChannelOption; -import org.apache.plc4x.java.utils.rawsockets.netty.RawSocketIpAddress; -import org.apache.plc4x.java.utils.rawsockets.netty.TcpIpPacketHandler; +import org.apache.plc4x.java.utils.rawsockets.netty.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,12 +41,14 @@ public class RawSocketChannelFactory implements ChannelFactory { private final InetAddress address; private final int port; private final int protocolId; + private final PacketHandler packetHandler; - public RawSocketChannelFactory(String deviceName, InetAddress address, int port, int protocolId) { + public RawSocketChannelFactory(String deviceName, InetAddress address, int port, int protocolId, PacketHandler packetHandler) { this.deviceName = deviceName; this.address = address; this.port = port; this.protocolId = protocolId; + this.packetHandler = packetHandler; } @Override @@ -61,7 +60,7 @@ public class RawSocketChannelFactory implements ChannelFactory { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup); bootstrap.channel(RawSocketChannel.class); - bootstrap.option(RawSocketChannelOption.PACKET_HANDLER, new TcpIpPacketHandler()); + bootstrap.option(RawSocketChannelOption.PACKET_HANDLER, packetHandler); // TODO we should use an explicit (configurable?) timeout here // bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000); bootstrap.handler(channelHandler); diff --git a/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketAddress.java b/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketAddress.java index 78310e2..22cf7fa 100644 --- a/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketAddress.java +++ b/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketAddress.java @@ -19,19 +19,42 @@ under the License. package org.apache.plc4x.java.utils.pcapsockets.netty; import java.io.File; +import java.net.InetAddress; import java.net.SocketAddress; public class PcapSocketAddress extends SocketAddress { + private static final long serialVersionUID = 1L; - private File pcapFile; + public static final int ALL_PORTS = -1; + public static final int ALL_PROTOCOLS = -1; + + private final File pcapFile; + private final InetAddress address; + private final int port; + private final int protocolId; - public PcapSocketAddress(File pcapFile) { + public PcapSocketAddress(File pcapFile, InetAddress address, int port, int protocolId) { this.pcapFile = pcapFile; + this.address = address; + this.port = port; + this.protocolId = protocolId; } public File getPcapFile() { return pcapFile; } + public InetAddress getAddress() { + return address; + } + + public int getPort() { + return port; + } + + public int getProtocolId() { + return protocolId; + } + } diff --git a/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannel.java b/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannel.java index aff7ec0..dca632b 100644 --- a/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannel.java +++ b/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannel.java @@ -83,7 +83,6 @@ public class PcapSocketChannel extends OioByteStreamChannel { this.localAddress = localAddress; remoteRawSocketAddress = (PcapSocketAddress) remoteAddress; - // Get a handle to the network-device and open it. File pcapFile = remoteRawSocketAddress.getPcapFile(); if(!pcapFile.exists()) { @@ -95,9 +94,16 @@ public class PcapSocketChannel extends OioByteStreamChannel { if(logger.isDebugEnabled()) { logger.debug(String.format("Opening PCAP capture file at: %s", pcapFile.getAbsolutePath())); } + handle = Pcaps.openOffline(remoteRawSocketAddress.getPcapFile().getAbsolutePath(), PcapHandle.TimestampPrecision.NANO); + // If the address allows fine tuning which packets to process, set a filter to reduce the load. + String filter = getFilter(remoteRawSocketAddress); + if(filter.length() > 0) { + handle.setFilter(filter, BpfProgram.BpfCompileMode.OPTIMIZE); + } + // Create a buffer where the raw socket worker can send data to. ByteBuf buffer = Unpooled.buffer(); @@ -114,7 +120,8 @@ public class PcapSocketChannel extends OioByteStreamChannel { // If last-time is not null, wait for the given number of nano-seconds. if(lastPacketTime != null) { - int numMicrosecondsSleep = curPacketTime.getNanos() - lastPacketTime.getNanos(); + int numMicrosecondsSleep = (int) + ((curPacketTime.getNanos() - lastPacketTime.getNanos()) / config.getSpeedFactor()); nanoSecondSleep(numMicrosecondsSleep); } @@ -208,6 +215,24 @@ public class PcapSocketChannel extends OioByteStreamChannel { } } + private String getFilter(PcapSocketAddress pcapSocketAddress) { + StringBuilder sb = new StringBuilder(); + sb.append("(ether proto \\ip)"); + // Add a filter for source or target address. + if(pcapSocketAddress.getAddress() != null) { + sb.append(" and (host ").append(pcapSocketAddress.getAddress().getHostAddress()).append(")"); + } + // Add a filter for TCP or UDP port. + if(pcapSocketAddress.getPort() != PcapSocketAddress.ALL_PORTS) { + sb.append(" and (port ").append(pcapSocketAddress.getPort()).append(")"); + } + if(pcapSocketAddress.getProtocolId() != PcapSocketAddress.ALL_PROTOCOLS) { + sb.append("(ether proto ").append(pcapSocketAddress.getProtocolId()).append(")"); + } + return sb.toString(); + } + + /** * This output stream simply discards anything it should send. */ diff --git a/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannelConfig.java b/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannelConfig.java index e1ddb23..7896824 100644 --- a/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannelConfig.java +++ b/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannelConfig.java @@ -29,6 +29,7 @@ import java.util.Map; public class PcapSocketChannelConfig extends DefaultChannelConfig implements ChannelConfig { private PacketHandler packetHandler; + private float speedFactor; public PcapSocketChannelConfig(Channel channel) { super(channel); @@ -42,7 +43,8 @@ public class PcapSocketChannelConfig extends DefaultChannelConfig implements Cha @Override public Map<ChannelOption<?>, Object> getOptions() { - return getOptions(super.getOptions(), PcapSocketChannelOption.PACKET_HANDLER); + return getOptions(super.getOptions(), + PcapSocketChannelOption.PACKET_HANDLER, PcapSocketChannelOption.SPEED_FACTOR); } @Override @@ -53,6 +55,14 @@ public class PcapSocketChannelConfig extends DefaultChannelConfig implements Cha return true; } return false; + } else if(option == PcapSocketChannelOption.SPEED_FACTOR) { + if(value instanceof Float) { + speedFactor = (float) value; + if(speedFactor > 0) { + return true; + } + } + return false; } else { return super.setOption(option, value); } @@ -66,4 +76,12 @@ public class PcapSocketChannelConfig extends DefaultChannelConfig implements Cha return packetHandler; } + public float getSpeedFactor() { + return speedFactor; + } + + public void setSpeedFactor(float speedFactor) { + this.speedFactor = speedFactor; + } + } diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketAddress.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketAddress.java index 0ef6312..ceb85f5 100644 --- a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketAddress.java +++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketAddress.java @@ -18,7 +18,6 @@ under the License. */ package org.apache.plc4x.java.utils.rawsockets.netty; -import java.net.InetAddress; import java.net.SocketAddress; public class RawSocketAddress extends SocketAddress { diff --git a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketAddress.java b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/UdpIpPacketHandler.java similarity index 53% copy from plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketAddress.java copy to plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/UdpIpPacketHandler.java index 0ef6312..1b030df 100644 --- a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketAddress.java +++ b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/UdpIpPacketHandler.java @@ -18,29 +18,24 @@ under the License. */ package org.apache.plc4x.java.utils.rawsockets.netty; -import java.net.InetAddress; -import java.net.SocketAddress; - -public class RawSocketAddress extends SocketAddress { - private static final long serialVersionUID = 1L; - - public static final int ALL_PROTOCOLS = -1; - - private final String deviceName; - - private final int protocolId; - - public RawSocketAddress(String deviceName, int protocolId) { - this.deviceName = deviceName; - this.protocolId = protocolId; - } - - public String getDeviceName() { - return deviceName; - } - - public int getProtocolId() { - return protocolId; +import org.pcap4j.packet.*; + +/** + * Little helper to automatically unwrap TCP packets to only + * pass along the payload and not the raw Ethernet packet. + */ +public class UdpIpPacketHandler implements PacketHandler { + + @Override + public byte[] getData(Packet packet) { + EthernetPacket ethernetPacket = (EthernetPacket) packet; + IpV4Packet ipv4Packet = (IpV4Packet) ethernetPacket.getPayload(); + UdpPacket udpPacket = (UdpPacket) ipv4Packet.getPayload(); + if(udpPacket.getPayload() instanceof UnknownPacket) { + UnknownPacket unknownPacket = (UnknownPacket) udpPacket.getPayload(); + return unknownPacket.getRawData(); + } + return new byte[0]; } }
