This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 4b63d690631ebde9fdf0f40de59a7aff287a260e
Author: JackHintonSmartDCSIT <jack.hin...@smartdcs.co.uk>
AuthorDate: Tue Apr 23 11:12:26 2024 +0100

    NIFI-13082 Added SplitPCAP Processor
    
    This closes #8691
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../nifi-network-processors/pom.xml                |  11 +
 .../processors/network/pcap/ByteBufferReader.java  |  59 +++++
 .../apache/nifi/processors/network/pcap/PCAP.java  | 127 ++++++++++
 .../nifi/processors/network/pcap/PCAPHeader.java   |  83 +++++++
 .../nifi/processors/network/pcap/Packet.java       | 145 +++++++++++
 .../nifi/processors/network/pcap/SplitPCAP.java    | 265 +++++++++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   3 +-
 .../nifi/processors/network/pcap/TestPCAP.java     | 104 ++++++++
 .../processors/network/pcap/TestSplitPCAP.java     | 134 +++++++++++
 9 files changed, 930 insertions(+), 1 deletion(-)

diff --git 
a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/pom.xml 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/pom.xml
index cd03dcc5af..34a9e5e3ec 100644
--- a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/pom.xml
+++ b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/pom.xml
@@ -39,5 +39,16 @@
                        <groupId>com.fasterxml.jackson.core</groupId>
                        <artifactId>jackson-databind</artifactId>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-mock</artifactId>
+                       <version>2.0.0-SNAPSHOT</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.junit.jupiter</groupId>
+                       <artifactId>junit-jupiter-api</artifactId>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 </project>
diff --git 
a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/ByteBufferReader.java
 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/ByteBufferReader.java
new file mode 100644
index 0000000000..bfa0db81da
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/ByteBufferReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class ByteBufferReader {
+    final private ByteBuffer buffer;
+
+    public ByteBufferReader(byte[] byteArray) {
+        this.buffer = ByteBuffer.wrap(byteArray);
+        buffer.order(ByteOrder.LITTLE_ENDIAN);
+    }
+
+    public int readU2() {
+        return (buffer.getShort() & 0xffff);
+    }
+
+    public long readU4() {
+        return ((long) buffer.getInt() & 0xffffffffL);
+    }
+
+    public int readS4() {
+        return buffer.getInt();
+    }
+
+    public byte[] readBytes(int n) {
+        byte[] output = new byte[n];
+        buffer.get(output);
+        return output;
+    }
+
+    public byte[] readBytes(long n) {
+        return readBytes((int) n);
+    }
+
+    public int bytesLeft() {
+        return buffer.remaining();
+    }
+
+    public boolean hasRemaining() {
+        return buffer.hasRemaining();
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/PCAP.java
 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/PCAP.java
new file mode 100644
index 0000000000..ec3862f5b5
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/PCAP.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * PCAP (named after libpcap / winpcap) is a popular format for saving
+ * network traffic grabbed by network sniffers. It is typically
+ * produced by tools like [tcpdump](<a 
href="https://www.tcpdump.org/";>...</a>) or
+ * [Wireshark](<a href="https://www.wireshark.org/";>...</a>).
+ *
+ * @see <a href=
+ * "https://wiki.wireshark.org/Development/LibpcapFileFormat";>Source</a>
+ */
+public class PCAP {
+    private final PCAPHeader hdr;
+    private final List<Packet> packets;
+
+    public PCAP(ByteBufferReader io) {
+        this.hdr = new PCAPHeader(io);
+        this.packets = new ArrayList<>();
+        while (io.hasRemaining()) {
+            this.packets.add(new Packet(io, this));
+        }
+    }
+
+    public PCAP(PCAPHeader hdr, List<Packet> packets) {
+        this.hdr = hdr;
+        this.packets = packets;
+    }
+
+    public byte[] toByteArray() {
+        int headerBufferSize = PCAPHeader.PCAP_HEADER_LENGTH;
+        ByteBuffer headerBuffer = ByteBuffer.allocate(headerBufferSize);
+        headerBuffer.order(ByteOrder.LITTLE_ENDIAN);
+
+        headerBuffer.put(this.hdr.magicNumber());
+        headerBuffer.put(readIntToNBytes(this.hdr.versionMajor(), 2));
+        headerBuffer.put(readIntToNBytes(this.hdr.versionMinor(), 2));
+        headerBuffer.put(readIntToNBytes(this.hdr.thiszone(), 4));
+        headerBuffer.put(readLongToNBytes(this.hdr.sigfigs(), 4, true));
+        headerBuffer.put(readLongToNBytes(this.hdr.snaplen(), 4, true));
+        headerBuffer.put(readLongToNBytes(this.hdr.network(), 4, true));
+
+        List<byte[]> packetByteArrays = new ArrayList<>();
+
+        int packetBufferSize = 0;
+
+        for (Packet currentPacket : packets) {
+            int currentPacketTotalLength = Packet.PACKET_HEADER_LENGTH + 
currentPacket.rawBody().length;
+
+            ByteBuffer currentPacketBytes = 
ByteBuffer.allocate(currentPacketTotalLength);
+            currentPacketBytes.put(readLongToNBytes(currentPacket.tsSec(), 4, 
false));
+            currentPacketBytes.put(readLongToNBytes(currentPacket.tsUsec(), 4, 
false));
+            currentPacketBytes.put(readLongToNBytes(currentPacket.inclLen(), 
4, false));
+            currentPacketBytes.put(readLongToNBytes(currentPacket.origLen(), 
4, false));
+            currentPacketBytes.put(currentPacket.rawBody());
+
+            packetByteArrays.add(currentPacketBytes.array());
+            packetBufferSize += currentPacketTotalLength;
+        }
+
+        ByteBuffer packetBuffer = ByteBuffer.allocate(packetBufferSize);
+        packetBuffer.order(ByteOrder.LITTLE_ENDIAN);
+
+        for (byte[] packetByteArray : packetByteArrays) {
+            packetBuffer.put(packetByteArray);
+        }
+
+        ByteBuffer allBytes = ByteBuffer.allocate(headerBufferSize + 
packetBufferSize);
+        allBytes.order(ByteOrder.LITTLE_ENDIAN);
+
+        allBytes.put(headerBuffer.array());
+        allBytes.put(packetBuffer.array());
+
+        return allBytes.array();
+    }
+
+    protected static byte[] readIntToNBytes(int input, int numberOfBytes) {
+        byte[] output = new byte[numberOfBytes];
+        output[0] = (byte) (input & 0xff);
+        for (int loop = 1; loop < numberOfBytes; loop++) {
+            output[loop] = (byte) (input >>> (8 * loop));
+        }
+        return output;
+    }
+
+    private byte[] readLongToNBytes(long input, int numberOfBytes, boolean 
isSigned) {
+        byte[] output = new byte[numberOfBytes];
+        output[0] = (byte) (input & 0xff);
+        for (int loop = 1; loop < numberOfBytes; loop++) {
+            if (isSigned) {
+                output[loop] = (byte) (input >> (8 * loop));
+            } else {
+                output[loop] = (byte) (input >>> (8 * loop));
+            }
+        }
+        return output;
+    }
+
+    public PCAPHeader getHeader() {
+        return hdr;
+    }
+
+    public List<Packet> getPackets() {
+        return packets;
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/PCAPHeader.java
 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/PCAPHeader.java
new file mode 100644
index 0000000000..6dd5bff330
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/PCAPHeader.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+public class PCAPHeader {
+    static final int PCAP_HEADER_LENGTH = 24;
+    private final byte[] magicNumber;
+    private final int versionMajor;
+    private final int versionMinor;
+    private final int thiszone;
+    private final long sigfigs;
+    private final long snaplen;
+    private final long network;
+
+    public PCAPHeader(ByteBufferReader io) {
+        this.magicNumber = io.readBytes(4);
+        this.versionMajor = io.readU2();
+        this.versionMinor = io.readU2();
+        this.thiszone = io.readS4();
+        this.sigfigs = io.readU4();
+        this.snaplen = io.readU4();
+        this.network = io.readU4();
+    }
+
+    public byte[] magicNumber() {
+        return magicNumber;
+    }
+
+    public int versionMajor() {
+        return versionMajor;
+    }
+
+    public int versionMinor() {
+        return versionMinor;
+    }
+
+    /**
+     * Correction time in seconds between UTC and the local
+     * timezone of the following packet header timestamps.
+     */
+    public int thiszone() {
+        return thiszone;
+    }
+
+    /**
+     * In theory, the accuracy of time stamps in the capture; in
+     * practice, all tools set it to 0.
+     */
+    public long sigfigs() {
+        return sigfigs;
+    }
+
+    /**
+     * The "snapshot length" for the capture (typically 65535 or
+     * even more, but might be limited by the user), see: incl_len
+     * vs. orig_len.
+     */
+    public long snaplen() {
+        return snaplen;
+    }
+
+    /**
+     * Link-layer header type, specifying the type of headers at
+     * the beginning of the packet.
+     */
+    public long network() {
+        return network;
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/Packet.java
 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/Packet.java
new file mode 100644
index 0000000000..086f6675e4
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/Packet.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+public class Packet {
+    static final int PACKET_HEADER_LENGTH = 16;
+
+    private ByteBufferReader io;
+    private long tsSec;
+    private long tsUsec;
+    private long inclLen;
+    private long origLen;
+    private long expectedLength;
+    private int totalLength;
+    private PCAP root;
+    private byte[] rawBody;
+    private String invalidityReason;
+
+    public Packet(ByteBufferReader io, PCAP root) {
+        this.root = root;
+        this.io = io;
+        read();
+    }
+
+    public Packet(byte[] headerArray, PCAP root) {
+        this.root = root;
+        this.io = new ByteBufferReader(headerArray);
+        read();
+    }
+
+    public Packet(long tSSec, long tSUsec, long inclLen, long origLen, byte[] 
rawBody) {
+        // packet header properties
+        this.tsSec = tSSec;
+        this.tsUsec = tSUsec;
+        this.inclLen = inclLen;
+        this.origLen = origLen;
+
+        // packet calculated properties
+        this.expectedLength = inclLen;
+        this.totalLength = PACKET_HEADER_LENGTH + rawBody.length;
+        this.rawBody = rawBody;
+        this.setValidity();
+    }
+
+    private void read() {
+        this.tsSec = this.io.readU4();
+        this.tsUsec = this.io.readU4();
+        this.inclLen = this.io.readU4();
+        this.origLen = this.io.readU4();
+
+        this.expectedLength = Math.min(inclLen(), 
root().getHeader().snaplen());
+
+        if (this.io.bytesLeft() >= expectedLength) {
+            this.rawBody = this.io.readBytes(expectedLength);
+        } else {
+            this.rawBody = new byte[0];
+        }
+        this.setValidity();
+        this.totalLength = PACKET_HEADER_LENGTH + this.rawBody.length;
+    }
+
+    private void setValidity() {
+        this.invalidityReason = null;
+        if (this.rawBody.length == 0) {
+            this.invalidityReason = "Packet body is empty";
+        } else if (this.inclLen > this.origLen) {
+            this.invalidityReason = "The reported length of this packet 
exceeds the reported length of the original packet "
+                    + "as sent on the network; (" + this.inclLen + " > " + 
this.origLen + ")";
+        } else if (this.origLen == 0) {
+            this.invalidityReason = "The reported original length of this 
packet as send on the network is 0.";
+        } else if (this.inclLen == 0) {
+            this.invalidityReason = "The reported length of this packet is 0.";
+        }
+    }
+
+    public boolean isInvalid() {
+        return this.invalidityReason != null;
+    }
+
+    public long tsSec() {
+        return tsSec;
+    }
+
+    public long tsUsec() {
+        return tsUsec;
+    }
+
+    /**
+     * Number of bytes of packet data actually captured and saved in the file.
+     */
+    public long inclLen() {
+        return inclLen;
+    }
+
+    /**
+     * Length of the packet as it appeared on the network when it was captured.
+     */
+    public long origLen() {
+        return origLen;
+    }
+
+    /**
+     * @see <a href=
+     * 
"https://wiki.wireshark.org/Development/LibpcapFileFormat#Packet_Data";>Source</a>
+     */
+    public PCAP root() {
+        return root;
+    }
+
+    public byte[] rawBody() {
+        return rawBody;
+    }
+
+    public long expectedLength() {
+        return expectedLength;
+    }
+
+    public int totalLength() {
+        return totalLength;
+    }
+
+    public String invalidityReason() {
+        return invalidityReason;
+    }
+
+    public void setBody(byte[] newBody) {
+        this.rawBody = newBody;
+        this.setValidity();
+        this.totalLength = PACKET_HEADER_LENGTH + rawBody.length;
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java
 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java
new file mode 100644
index 0000000000..7074f55223
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"PCAP", "Splitter", "Network", "Packet", "Capture", "Wireshark", 
"TShark", "TcpDump", "WinDump", "sniffers"})
+@CapabilityDescription("Splits one pcap file into multiple pcap files based on 
a maximum size.")
+@WritesAttributes(
+        {
+                @WritesAttribute(
+                        attribute = SplitPCAP.ERROR_REASON_LABEL,
+                        description = "The reason the FlowFile was sent to the 
failure relationship."
+                ),
+                @WritesAttribute(
+                        attribute = "fragment.identifier",
+                        description = "All split PCAP FlowFiles produced from 
the same parent PCAP FlowFile will have the same randomly generated UUID added 
for this attribute"
+                ),
+                @WritesAttribute(
+                        attribute = "fragment.index",
+                        description = "A one-up number that indicates the 
ordering of the split PCAP FlowFiles that were created from a single parent 
PCAP FlowFile"
+                ),
+                @WritesAttribute(
+                        attribute = "fragment.count",
+                        description = "The number of split PCAP FlowFiles 
generated from the parent PCAP FlowFile"
+                ),
+                @WritesAttribute(
+                        attribute = "segment.original.filename",
+                        description = "The filename of the parent PCAP 
FlowFile"
+                )
+        }
+)
+public class SplitPCAP extends AbstractProcessor {
+
+    protected static final String ERROR_REASON_LABEL = "error.reason";
+    public static final String FRAGMENT_ID = 
FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX = 
FragmentAttributes.FRAGMENT_INDEX.key();
+    public static final String FRAGMENT_COUNT = 
FragmentAttributes.FRAGMENT_COUNT.key();
+    public static final String SEGMENT_ORIGINAL_FILENAME = 
FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
+
+    public static final PropertyDescriptor PCAP_MAX_SIZE = new 
PropertyDescriptor
+            .Builder().name("PCAP Max Size")
+            .displayName("PCAP Max Size")
+            .description("Maximum size of each output PCAP file. PCAP packets 
larger than the configured size result in routing FlowFiles to the failure 
relationship.")
+            .required(true)
+            .defaultValue("1 MB")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("The original FlowFile that was split into segments. 
If the FlowFile fails processing, nothing will be sent to "
+                    + "this relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile cannot be transformed from the 
configured input format to the configured output format, "
+                    + "the unchanged FlowFile will be routed to this 
relationship.")
+            .build();
+
+    public static final Relationship REL_SPLIT = new Relationship.Builder()
+            .name("split")
+            .description("The individual PCAP 'segments' of the original PCAP 
FlowFile will be routed to this relationship.")
+            .build();
+
+    private static final List<PropertyDescriptor> DESCRIPTORS = 
List.of(PCAP_MAX_SIZE);
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Set.of(REL_ORIGINAL, REL_FAILURE, REL_SPLIT);
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    /**
+     * This method is called when a trigger event occurs in the processor.
+     * It processes the incoming flow file, splits it into smaller pcap files 
based on the PCAP Max Size,
+     * and transfers the split pcap files to the success relationship.
+     * If the flow file is empty or not parseable, it is transferred to the 
failure relationship.
+     *
+     * @param context the process context
+     * @param session the process session
+     */
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+
+        FlowFile originalFlowFile = session.get();
+        if (originalFlowFile == null) {
+            return;
+        }
+        final int pcapMaxSize = 
context.getProperty(PCAP_MAX_SIZE.getName()).asDataSize(DataUnit.B).intValue();
+        final PCAPStreamSplitterCallback callback = new 
PCAPStreamSplitterCallback(session, originalFlowFile, pcapMaxSize);
+
+        try {
+            session.read(originalFlowFile, callback);
+        } catch (ProcessException e) {
+            getLogger().error("Failed to split {}", originalFlowFile, e);
+            session.remove(callback.getSplitFiles());
+            session.putAttribute(originalFlowFile, ERROR_REASON_LABEL, 
e.getMessage());
+            session.transfer(originalFlowFile, REL_FAILURE);
+            return;
+        }
+
+        final String fragmentId = UUID.randomUUID().toString();
+        final String originalFileName = 
originalFlowFile.getAttribute(CoreAttributes.FILENAME.key());
+        final String originalFileNameWithoutExtension = 
originalFileName.substring(0, originalFileName.lastIndexOf("."));
+
+        final List<FlowFile> splitFiles = callback.getSplitFiles();
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(FRAGMENT_COUNT, String.valueOf(splitFiles.size()));
+        attributes.put(FRAGMENT_ID, fragmentId);
+        attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName);
+
+        IntStream.range(0, splitFiles.size()).forEach(index -> {
+            FlowFile split = splitFiles.get(index);
+            attributes.put(CoreAttributes.FILENAME.key(), 
"%s-%d.pcap".formatted(originalFileNameWithoutExtension, index));
+            attributes.put(FRAGMENT_INDEX, Integer.toString(index));
+            session.transfer(session.putAllAttributes(split, attributes), 
REL_SPLIT);
+        });
+        session.transfer(originalFlowFile, REL_ORIGINAL);
+    }
+
+    protected static class PCAPStreamSplitterCallback implements 
InputStreamCallback {
+        private final ProcessSession session;
+        private final FlowFile originalFlowFile;
+        private final int pcapMaxSize;
+        private final List<FlowFile> splitFiles = new ArrayList<>();
+
+        public List<FlowFile> getSplitFiles() {
+            return splitFiles;
+        }
+
+        public PCAPStreamSplitterCallback(ProcessSession session, FlowFile 
flowFile, int pcapMaxSize) {
+            this.session = session;
+            this.originalFlowFile = flowFile;
+            this.pcapMaxSize = pcapMaxSize;
+        }
+
+        private Packet getNextPacket(final BufferedInputStream bufferedStream, 
final PCAP templatePcap, final int totalPackets) throws IOException {
+            final byte[] packetHeader = new byte[Packet.PACKET_HEADER_LENGTH];
+            StreamUtils.read(bufferedStream, packetHeader, 
Packet.PACKET_HEADER_LENGTH);
+
+            final Packet currentPacket = new Packet(packetHeader, 
templatePcap);
+
+            if (currentPacket.totalLength() > this.pcapMaxSize) {
+                throw new ProcessException("PCAP Packet length [%d] larger 
then configured maximum [%d]".formatted(currentPacket.totalLength(), 
pcapMaxSize));
+            }
+
+            final int expectedLength = (int) currentPacket.expectedLength();
+            final byte[] packetBody = new byte[expectedLength];
+            StreamUtils.read(bufferedStream, packetBody, expectedLength);
+            currentPacket.setBody(packetBody);
+
+            if (currentPacket.isInvalid()) {
+                throw new ProcessException("PCAP contains an invalid packet. 
Packet number [%d] is invalid - [%s]".formatted(totalPackets, 
currentPacket.invalidityReason()));
+            }
+
+            return currentPacket;
+        }
+
+        @Override
+        public void process(final InputStream inStream) throws IOException {
+            final List<Packet> loadedPackets = new ArrayList<>();
+            final BufferedInputStream bufferedStream = new 
BufferedInputStream(inStream);
+            int totalPackets = 1;
+
+            if (bufferedStream.available() == 0) {
+                throw new ProcessException("Input PCAP file empty");
+            }
+
+            final byte[] pcapHeader = new byte[PCAPHeader.PCAP_HEADER_LENGTH];
+            StreamUtils.read(bufferedStream, pcapHeader, 
PCAPHeader.PCAP_HEADER_LENGTH);
+
+            int currentPcapTotalLength = PCAPHeader.PCAP_HEADER_LENGTH;
+
+            final PCAP templatePcap = new PCAP(new 
ByteBufferReader(pcapHeader));
+
+            while (bufferedStream.available() > 0) {
+
+                Packet currentPacket = getNextPacket(bufferedStream, 
templatePcap, totalPackets);
+
+                if (currentPcapTotalLength + currentPacket.totalLength() > 
this.pcapMaxSize) {
+
+                    templatePcap.getPackets().addAll(loadedPackets);
+                    FlowFile newFlowFile = session.create(originalFlowFile);
+                    try (final OutputStream out = session.write(newFlowFile)) {
+                        out.write(templatePcap.toByteArray());
+                        this.splitFiles.add(newFlowFile);
+                    }
+
+                    loadedPackets.clear();
+                    currentPcapTotalLength = PCAPHeader.PCAP_HEADER_LENGTH;
+                    templatePcap.getPackets().clear();
+                }
+
+                loadedPackets.add(currentPacket);
+                totalPackets++;
+                currentPcapTotalLength += currentPacket.totalLength();
+            }
+
+            // If there are any packets left over, create a new flowfile.
+            if (!loadedPackets.isEmpty()) {
+                templatePcap.getPackets().addAll(loadedPackets);
+                FlowFile newFlowFile = session.create(originalFlowFile);
+                try (final OutputStream out = session.write(newFlowFile)) {
+                    out.write(templatePcap.toByteArray());
+                    this.splitFiles.add(newFlowFile);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index a5dfac9f70..c4496962a6 100644
--- 
a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,4 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.processors.network.ParseNetflowv5
\ No newline at end of file
+org.apache.nifi.processors.network.ParseNetflowv5
+org.apache.nifi.processors.network.pcap.SplitPCAP
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestPCAP.java
 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestPCAP.java
new file mode 100644
index 0000000000..a619824cd8
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestPCAP.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestPCAP {
+    private static final byte[] PACKET_DATA = new byte[]{
+            0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
+            10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
+            20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
+    };
+
+    private static final int[][] PCAP_HEADER_VALUES = new int[][]{
+            new int[]{2, 2},
+            new int[]{4, 2},
+            new int[]{0, 4},
+            new int[]{0, 4},
+            new int[]{40, 4},
+            new int[]{1, 4}
+    };
+
+    private static final Map<String, Long> packetHeaderValues = Map.of(
+            "tsSec", 1713184965L,
+            "tsUsec", 1000L,
+            "inclLen", 30L,
+            "origLen", 30L
+    );
+
+    @Test
+    void testReadBytesFull() {
+
+        // Create a header for the test PCAP
+        ByteBuffer headerBuffer = 
ByteBuffer.allocate(PCAPHeader.PCAP_HEADER_LENGTH);
+        headerBuffer.put(new byte[]{(byte) 0xa1, (byte) 0xb2, (byte) 0xc3, 
(byte) 0xd4});
+        for (int[] value : PCAP_HEADER_VALUES) {
+            headerBuffer.put(PCAP.readIntToNBytes(value[0], value[1]));
+        }
+        PCAPHeader hdr = new PCAPHeader(new 
ByteBufferReader(headerBuffer.array()));
+        // Create a sample packet
+        List<Packet> packets = new ArrayList<>();
+        packets.add(new Packet(
+                packetHeaderValues.get("tsSec"),
+                packetHeaderValues.get("tsUsec"),
+                packetHeaderValues.get("inclLen"),
+                packetHeaderValues.get("origLen"),
+                PACKET_DATA
+        ));
+
+        // create test PCAP
+        PCAP testPcap = new PCAP(hdr, packets);
+
+        // Call the readBytesFull method
+        byte[] result = testPcap.toByteArray();
+
+        // Assert the expected byte array length
+        assertEquals(70, result.length);
+
+        // Assert the expected byte array values
+        ByteBuffer buffer = ByteBuffer.wrap(result);
+        assertEquals(0xa1b2c3d4, buffer.getInt());
+        ByteBuffer litteEndianBuffer = 
ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN);
+        litteEndianBuffer.position(4);
+
+        for (int[] value : PCAP_HEADER_VALUES) {
+            if (value[1] == 2) {
+                assertEquals(value[0], litteEndianBuffer.getShort());
+            } else {
+                assertEquals(value[0], litteEndianBuffer.getInt());
+            }
+        }
+
+        assertEquals(packetHeaderValues.get("tsSec"), 
litteEndianBuffer.getInt());
+        assertEquals(packetHeaderValues.get("tsUsec"), 
litteEndianBuffer.getInt());
+        assertEquals(packetHeaderValues.get("inclLen"), 
litteEndianBuffer.getInt());
+        assertEquals(packetHeaderValues.get("origLen"), 
litteEndianBuffer.getInt());
+        byte[] body = new byte[30];
+        litteEndianBuffer.get(40, body, 0, 30);
+        assertArrayEquals(body, PACKET_DATA);
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestSplitPCAP.java
 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestSplitPCAP.java
new file mode 100644
index 0000000000..bcc5ef7cba
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestSplitPCAP.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class TestSplitPCAP {
+
+    private PCAPHeader pcapHeader;
+    private Packet validPacket;
+    private Packet invalidPacket;
+
+    protected static final long PACKET_TIMESTAMP = 1713184965;
+    protected static final long PACKET_INTERVAL = 1000;
+    private static final byte[] PACKET_DATA = new byte[]{
+            0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
+            10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
+            20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
+    };
+
+    @BeforeEach
+    void init() {
+
+        ByteBuffer headerBuffer = 
ByteBuffer.allocate(PCAPHeader.PCAP_HEADER_LENGTH);
+        headerBuffer.put(new byte[]{(byte) 0xa1, (byte) 0xb2, (byte) 0xc3, 
(byte) 0xd4});
+        headerBuffer.put(PCAP.readIntToNBytes(2, 2));
+        headerBuffer.put(PCAP.readIntToNBytes(4, 2));
+        headerBuffer.put(PCAP.readIntToNBytes(0, 4));
+        headerBuffer.put(PCAP.readIntToNBytes(0, 4));
+        headerBuffer.put(PCAP.readIntToNBytes(4000, 4));
+        headerBuffer.put(PCAP.readIntToNBytes(1, 4));
+        this.pcapHeader = new PCAPHeader(new 
ByteBufferReader(headerBuffer.array()));
+
+        this.validPacket = new Packet(
+                PACKET_TIMESTAMP,
+                PACKET_INTERVAL,
+                30,
+                30,
+                PACKET_DATA
+        );
+
+        this.invalidPacket = new Packet(
+                PACKET_TIMESTAMP,
+                PACKET_INTERVAL,
+                50,
+                10,
+                PACKET_DATA
+        );
+
+    }
+
+    void executeTest(String pcapMaxSize, List<Packet> packets, 
Map<Relationship, Integer> expectedRelations) {
+        TestRunner runner = TestRunners.newTestRunner(SplitPCAP.class);
+        runner.setProperty(SplitPCAP.PCAP_MAX_SIZE, pcapMaxSize);
+
+        PCAP testPcap = new PCAP(this.pcapHeader, packets);
+
+        runner.enqueue(testPcap.toByteArray());
+
+        runner.run();
+
+        for (Map.Entry<Relationship, Integer> entry : 
expectedRelations.entrySet()) {
+            runner.assertTransferCount(entry.getKey(), entry.getValue());
+        }
+
+        runner.assertQueueEmpty();
+    }
+
+    @Test
+    void testSuccesses() {
+        executeTest(
+                "100B",
+                Collections.nCopies(3, this.validPacket),
+                Map.of(
+                        SplitPCAP.REL_SPLIT, 3,
+                        SplitPCAP.REL_ORIGINAL, 1
+                )
+        );
+        executeTest(
+                "50B",
+                Collections.nCopies(3, this.validPacket),
+                Map.of(
+                        SplitPCAP.REL_SPLIT, 4,
+                        SplitPCAP.REL_ORIGINAL, 1
+                )
+        );
+    }
+
+    @Test
+    void testFailures() {
+        executeTest(
+                "50B",
+                Collections.nCopies(3, this.invalidPacket),
+                Map.of(SplitPCAP.REL_FAILURE, 1)
+        );
+        executeTest(
+                "10B",
+                Collections.nCopies(3, this.validPacket),
+                Map.of(SplitPCAP.REL_FAILURE, 1)
+        );
+
+        List<Packet> mixedValidityPackets = new 
ArrayList<>(Collections.nCopies(3, this.validPacket));
+        mixedValidityPackets.add(this.invalidPacket);
+        executeTest(
+                "50B",
+                mixedValidityPackets,
+                Map.of(SplitPCAP.REL_FAILURE, 1)
+        );
+    }
+}
\ No newline at end of file


Reply via email to