This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 2eb67c005e47ea6e4187da61fd1d87ec4e5ebd27
Author: Christofer Dutz <[email protected]>
AuthorDate: Tue Sep 24 10:00:14 2019 +0200

    - Improved and fine-tuned the GeneratedDriverByteToMessageCodec to allow 
checking if enough data is available and to remove the rest of faulty packages 
after failing to decode.
    - Added a protocolHandler property to the RawSocketChannelFactory
    - Added filter capabilities to the PcapSocket stuff
    - Added a speed-factor to slow down or speed up the pcap replay
---
 plc4j/protocols/driver-bases/base/pom.xml          |  4 +++
 .../base/GeneratedDriverByteToMessageCodec.java    | 19 ++++++++--
 .../base/connection/RawSocketChannelFactory.java   | 11 +++---
 .../utils/pcapsockets/netty/PcapSocketAddress.java | 27 ++++++++++++--
 .../utils/pcapsockets/netty/PcapSocketChannel.java | 29 +++++++++++++--
 .../pcapsockets/netty/PcapSocketChannelConfig.java | 20 ++++++++++-
 .../utils/rawsockets/netty/RawSocketAddress.java   |  1 -
 ...wSocketAddress.java => UdpIpPacketHandler.java} | 41 ++++++++++------------
 8 files changed, 115 insertions(+), 37 deletions(-)

diff --git a/plc4j/protocols/driver-bases/base/pom.xml 
b/plc4j/protocols/driver-bases/base/pom.xml
index e27b060..ae89e08 100644
--- a/plc4j/protocols/driver-bases/base/pom.xml
+++ b/plc4j/protocols/driver-bases/base/pom.xml
@@ -64,6 +64,10 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-transport</artifactId>
     </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>ch.qos.logback</groupId>
diff --git 
a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/GeneratedDriverByteToMessageCodec.java
 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/GeneratedDriverByteToMessageCodec.java
index 4ac7fb6..5818622 100644
--- 
a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/GeneratedDriverByteToMessageCodec.java
+++ 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/GeneratedDriverByteToMessageCodec.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.java.base;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
+import org.apache.commons.codec.binary.Hex;
 import org.apache.plc4x.java.utils.Message;
 import org.apache.plc4x.java.utils.MessageIO;
 import org.apache.plc4x.java.utils.ReadBuffer;
@@ -48,17 +49,31 @@ public abstract class GeneratedDriverByteToMessageCodec<T 
extends Message> exten
 
     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, 
List<Object> out) throws Exception {
-        byte[] bytes = new byte[byteBuf.readableBytes()];
+        // Check if enough data is present to process the entire package.
+        int packetSize = getPacketSize(byteBuf);
+        if(packetSize == -1 || packetSize > byteBuf.readableBytes()) {
+            return;
+        }
+
+        byte[] bytes = new byte[packetSize];
         byteBuf.readBytes(bytes);
+
         ReadBuffer readBuffer = new ReadBuffer(bytes);
         while (readBuffer.getPos() < bytes.length) {
             try {
                 T packet = io.parse(readBuffer);
                 out.add(packet);
             } catch (Exception e) {
-                logger.warn("Error decoding package: " + e.getMessage());
+                logger.warn("Error decoding package with content [" + 
Hex.encodeHexString(bytes) + "]: "
+                    + e.getMessage(), e);
+                // Just remove any trailing junk ... if there is any.
+                removeRestOfCorruptPackage(byteBuf);
             }
         }
     }
 
+    abstract protected int getPacketSize(ByteBuf byteBuf);
+
+    abstract protected void removeRestOfCorruptPackage(ByteBuf byteBuf);
+
 }
diff --git 
a/plc4j/protocols/driver-bases/raw-socket/src/main/java/org/apache/plc4x/java/base/connection/RawSocketChannelFactory.java
 
b/plc4j/protocols/driver-bases/raw-socket/src/main/java/org/apache/plc4x/java/base/connection/RawSocketChannelFactory.java
index 80684e6..5ba44c6 100644
--- 
a/plc4j/protocols/driver-bases/raw-socket/src/main/java/org/apache/plc4x/java/base/connection/RawSocketChannelFactory.java
+++ 
b/plc4j/protocols/driver-bases/raw-socket/src/main/java/org/apache/plc4x/java/base/connection/RawSocketChannelFactory.java
@@ -25,10 +25,7 @@ import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcException;
-import org.apache.plc4x.java.utils.rawsockets.netty.RawSocketChannel;
-import org.apache.plc4x.java.utils.rawsockets.netty.RawSocketChannelOption;
-import org.apache.plc4x.java.utils.rawsockets.netty.RawSocketIpAddress;
-import org.apache.plc4x.java.utils.rawsockets.netty.TcpIpPacketHandler;
+import org.apache.plc4x.java.utils.rawsockets.netty.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,12 +41,14 @@ public class RawSocketChannelFactory implements 
ChannelFactory {
     private final InetAddress address;
     private final int port;
     private final int protocolId;
+    private final PacketHandler packetHandler;
 
-    public RawSocketChannelFactory(String deviceName, InetAddress address, int 
port, int protocolId) {
+    public RawSocketChannelFactory(String deviceName, InetAddress address, int 
port, int protocolId, PacketHandler packetHandler) {
         this.deviceName = deviceName;
         this.address = address;
         this.port = port;
         this.protocolId = protocolId;
+        this.packetHandler = packetHandler;
     }
 
     @Override
@@ -61,7 +60,7 @@ public class RawSocketChannelFactory implements 
ChannelFactory {
             Bootstrap bootstrap = new Bootstrap();
             bootstrap.group(workerGroup);
             bootstrap.channel(RawSocketChannel.class);
-            bootstrap.option(RawSocketChannelOption.PACKET_HANDLER, new 
TcpIpPacketHandler());
+            bootstrap.option(RawSocketChannelOption.PACKET_HANDLER, 
packetHandler);
             // TODO we should use an explicit (configurable?) timeout here
             // bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
             bootstrap.handler(channelHandler);
diff --git 
a/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketAddress.java
 
b/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketAddress.java
index 78310e2..22cf7fa 100644
--- 
a/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketAddress.java
+++ 
b/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketAddress.java
@@ -19,19 +19,42 @@ under the License.
 package org.apache.plc4x.java.utils.pcapsockets.netty;
 
 import java.io.File;
+import java.net.InetAddress;
 import java.net.SocketAddress;
 
 public class PcapSocketAddress extends SocketAddress {
+
     private static final long serialVersionUID = 1L;
 
-    private File pcapFile;
+    public static final int ALL_PORTS = -1;
+    public static final int ALL_PROTOCOLS = -1;
+
+    private final File pcapFile;
+    private final InetAddress address;
+    private final int port;
+    private final int protocolId;
 
-    public PcapSocketAddress(File pcapFile) {
+    public PcapSocketAddress(File pcapFile, InetAddress address, int port, int 
protocolId) {
         this.pcapFile = pcapFile;
+        this.address = address;
+        this.port = port;
+        this.protocolId = protocolId;
     }
 
     public File getPcapFile() {
         return pcapFile;
     }
 
+    public InetAddress getAddress() {
+        return address;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public int getProtocolId() {
+        return protocolId;
+    }
+
 }
diff --git 
a/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannel.java
 
b/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannel.java
index aff7ec0..dca632b 100644
--- 
a/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannel.java
+++ 
b/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannel.java
@@ -83,7 +83,6 @@ public class PcapSocketChannel extends OioByteStreamChannel {
         this.localAddress = localAddress;
         remoteRawSocketAddress = (PcapSocketAddress) remoteAddress;
 
-
         // Get a handle to the network-device and open it.
         File pcapFile = remoteRawSocketAddress.getPcapFile();
         if(!pcapFile.exists()) {
@@ -95,9 +94,16 @@ public class PcapSocketChannel extends OioByteStreamChannel {
         if(logger.isDebugEnabled()) {
             logger.debug(String.format("Opening PCAP capture file at: %s", 
pcapFile.getAbsolutePath()));
         }
+
         handle = 
Pcaps.openOffline(remoteRawSocketAddress.getPcapFile().getAbsolutePath(),
             PcapHandle.TimestampPrecision.NANO);
 
+        // If the address allows fine tuning which packets to process, set a 
filter to reduce the load.
+        String filter = getFilter(remoteRawSocketAddress);
+        if(filter.length() > 0) {
+            handle.setFilter(filter, BpfProgram.BpfCompileMode.OPTIMIZE);
+        }
+
         // Create a buffer where the raw socket worker can send data to.
         ByteBuf buffer = Unpooled.buffer();
 
@@ -114,7 +120,8 @@ public class PcapSocketChannel extends OioByteStreamChannel 
{
 
                         // If last-time is not null, wait for the given number 
of nano-seconds.
                         if(lastPacketTime != null) {
-                            int numMicrosecondsSleep = 
curPacketTime.getNanos() - lastPacketTime.getNanos();
+                            int numMicrosecondsSleep = (int)
+                                ((curPacketTime.getNanos() - 
lastPacketTime.getNanos()) / config.getSpeedFactor());
                             nanoSecondSleep(numMicrosecondsSleep);
                         }
 
@@ -208,6 +215,24 @@ public class PcapSocketChannel extends 
OioByteStreamChannel {
         }
     }
 
+    private String getFilter(PcapSocketAddress pcapSocketAddress) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("(ether proto \\ip)");
+        // Add a filter for source or target address.
+        if(pcapSocketAddress.getAddress() != null) {
+            sb.append(" and (host 
").append(pcapSocketAddress.getAddress().getHostAddress()).append(")");
+        }
+        // Add a filter for TCP or UDP port.
+        if(pcapSocketAddress.getPort() != PcapSocketAddress.ALL_PORTS) {
+            sb.append(" and (port 
").append(pcapSocketAddress.getPort()).append(")");
+        }
+        if(pcapSocketAddress.getProtocolId() != 
PcapSocketAddress.ALL_PROTOCOLS) {
+            sb.append("(ether proto 
").append(pcapSocketAddress.getProtocolId()).append(")");
+        }
+        return sb.toString();
+    }
+
+
     /**
      * This output stream simply discards anything it should send.
      */
diff --git 
a/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannelConfig.java
 
b/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannelConfig.java
index e1ddb23..7896824 100644
--- 
a/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannelConfig.java
+++ 
b/plc4j/utils/pcap-sockets/src/main/java/org/apache/plc4x/java/utils/pcapsockets/netty/PcapSocketChannelConfig.java
@@ -29,6 +29,7 @@ import java.util.Map;
 public class PcapSocketChannelConfig extends DefaultChannelConfig implements 
ChannelConfig {
 
     private PacketHandler packetHandler;
+    private float speedFactor;
 
     public PcapSocketChannelConfig(Channel channel) {
         super(channel);
@@ -42,7 +43,8 @@ public class PcapSocketChannelConfig extends 
DefaultChannelConfig implements Cha
 
     @Override
     public Map<ChannelOption<?>, Object> getOptions() {
-        return getOptions(super.getOptions(), 
PcapSocketChannelOption.PACKET_HANDLER);
+        return getOptions(super.getOptions(),
+            PcapSocketChannelOption.PACKET_HANDLER, 
PcapSocketChannelOption.SPEED_FACTOR);
     }
 
     @Override
@@ -53,6 +55,14 @@ public class PcapSocketChannelConfig extends 
DefaultChannelConfig implements Cha
                 return true;
             }
             return false;
+        } else if(option == PcapSocketChannelOption.SPEED_FACTOR) {
+            if(value instanceof Float) {
+                speedFactor = (float) value;
+                if(speedFactor > 0) {
+                    return true;
+                }
+            }
+            return false;
         } else {
             return super.setOption(option, value);
         }
@@ -66,4 +76,12 @@ public class PcapSocketChannelConfig extends 
DefaultChannelConfig implements Cha
         return packetHandler;
     }
 
+    public float getSpeedFactor() {
+        return speedFactor;
+    }
+
+    public void setSpeedFactor(float speedFactor) {
+        this.speedFactor = speedFactor;
+    }
+
 }
diff --git 
a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketAddress.java
 
b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketAddress.java
index 0ef6312..ceb85f5 100644
--- 
a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketAddress.java
+++ 
b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketAddress.java
@@ -18,7 +18,6 @@ under the License.
 */
 package org.apache.plc4x.java.utils.rawsockets.netty;
 
-import java.net.InetAddress;
 import java.net.SocketAddress;
 
 public class RawSocketAddress extends SocketAddress {
diff --git 
a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketAddress.java
 
b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/UdpIpPacketHandler.java
similarity index 53%
copy from 
plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketAddress.java
copy to 
plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/UdpIpPacketHandler.java
index 0ef6312..1b030df 100644
--- 
a/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/RawSocketAddress.java
+++ 
b/plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/UdpIpPacketHandler.java
@@ -18,29 +18,24 @@ under the License.
 */
 package org.apache.plc4x.java.utils.rawsockets.netty;
 
-import java.net.InetAddress;
-import java.net.SocketAddress;
-
-public class RawSocketAddress extends SocketAddress {
-    private static final long serialVersionUID = 1L;
-
-    public static final int ALL_PROTOCOLS = -1;
-
-    private final String deviceName;
-
-    private final int protocolId;
-
-    public RawSocketAddress(String deviceName, int protocolId) {
-        this.deviceName = deviceName;
-        this.protocolId = protocolId;
-    }
-
-    public String getDeviceName() {
-        return deviceName;
-    }
-
-    public int getProtocolId() {
-        return protocolId;
+import org.pcap4j.packet.*;
+
+/**
+ * Little helper to automatically unwrap TCP packets to only
+ * pass along the payload and not the raw Ethernet packet.
+ */
+public class UdpIpPacketHandler implements PacketHandler {
+
+    @Override
+    public byte[] getData(Packet packet) {
+        EthernetPacket ethernetPacket = (EthernetPacket) packet;
+        IpV4Packet ipv4Packet = (IpV4Packet) ethernetPacket.getPayload();
+        UdpPacket udpPacket = (UdpPacket) ipv4Packet.getPayload();
+        if(udpPacket.getPayload() instanceof UnknownPacket) {
+            UnknownPacket unknownPacket = (UnknownPacket) 
udpPacket.getPayload();
+            return unknownPacket.getRawData();
+        }
+        return new byte[0];
     }
 
 }

Reply via email to