[ https://issues.apache.org/jira/browse/DRILL-5432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16021886#comment-16021886 ]
ASF GitHub Bot commented on DRILL-5432: --------------------------------------- Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/831#discussion_r118107316 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java --- @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcap.decoder; + +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; + +import static org.apache.drill.exec.store.pcap.Utils.convertInt; +import static org.apache.drill.exec.store.pcap.Utils.convertShort; +import static org.apache.drill.exec.store.pcap.Utils.getByte; +import static org.apache.drill.exec.store.pcap.Utils.getIntFileOrder; +import static org.apache.drill.exec.store.pcap.Utils.getShort; + +public class Packet { + // pcap header + // typedef struct pcaprec_hdr_s { + // guint32 ts_sec; // timestamp seconds + // guint32 ts_usec; // timestamp microseconds */ + // guint32 incl_len; // number of octets of packet saved in file */ + // guint32 orig_len; // actual length of packet */ + // } pcaprec_hdr_t; + private long timestamp; + private int originalLength; + + private byte[] raw; + + private int etherOffset; + private int ipOffset; + + private int packetLength; + private int etherProtocol; + private int protocol; + + private boolean isRoutingV6; + + @SuppressWarnings("WeakerAccess") + public boolean readPcap(final InputStream in, final boolean byteOrder, final int maxLength) throws IOException { + byte[] pcapHeader = new byte[PacketConstants.PCAP_HEADER_SIZE]; + int n = in.read(pcapHeader); + if (n < pcapHeader.length) { + return false; + } + decodePcapHeader(pcapHeader, byteOrder, maxLength, 0); + + raw = new byte[originalLength]; + n = in.read(raw); + if (n < 0) { + return false; + } + etherOffset = 0; + + decodeEtherPacket(); + return true; + } + + @SuppressWarnings("WeakerAccess") + public int decodePcap(final byte[] buffer, final int offset, final boolean byteOrder, final int maxLength) { + raw = buffer; + etherOffset = offset + PacketConstants.PCAP_HEADER_SIZE; + decodePcapHeader(raw, byteOrder, maxLength, offset); + decodeEtherPacket(); + return offset + PacketConstants.PCAP_HEADER_SIZE + originalLength; + } + + public String getPacketType() { + if (isTcpPacket()) { + return "TCP"; + } else if (isUdpPacket()) { + return "UDP"; + } else if (isArpPacket()) { + return "ARP"; + } else if (isIcmpPacket()) { + return "ICMP"; + } else { + return "unknown"; + } + } + + @SuppressWarnings("WeakerAccess") + public boolean isIpV4Packet() { + return etherProtocol == PacketConstants.IPv4_TYPE; + } + + @SuppressWarnings("WeakerAccess") + public boolean isIpV6Packet() { + return etherProtocol == PacketConstants.IPv6_TYPE; + } + + @SuppressWarnings("WeakerAccess") + public boolean isPPPoV6Packet() { + return etherProtocol == PacketConstants.PPPoV6_TYPE; + } + + @SuppressWarnings("WeakerAccess") + public boolean isTcpPacket() { + return protocol == PacketConstants.TCP_PROTOCOL; + } + + @SuppressWarnings("WeakerAccess") + public boolean isUdpPacket() { + return protocol == PacketConstants.UDP_PROTOCOL; + } + + @SuppressWarnings("WeakerAccess") + public boolean isArpPacket() { + return protocol == PacketConstants.ARP_PROTOCOL; + } + + @SuppressWarnings("WeakerAccess") + public boolean isIcmpPacket() { + return protocol == PacketConstants.ICMP_PROTOCOL; + } + + public long getSessionHash() { + if (isTcpPacket()) { + Murmur128 h1 = new Murmur128(1, 2); + byte[] buf = getIpAddressBytes(true); + assert buf != null; + h1.hash(buf, 0, buf.length); + h1.hash(getSrc_port()); + + Murmur128 h2 = new Murmur128(1, 2); + buf = getIpAddressBytes(false); + assert buf != null; + h2.hash(buf, 0, buf.length); + h2.hash(getDst_port()); + + return h1.digest64() ^ h2.digest64(); + } else { + return 0; + } + } + + public long getTimestamp() { + return timestamp; + } + + public int getPacketLength() { + return packetLength; + } + + public InetAddress getSrc_ip() { + return getIPAddress(true); + } + + public InetAddress getDst_ip() { + return getIPAddress(false); + } + + public String getEthernetSource() { + return getEthernetAddress(PacketConstants.ETHER_SRC_OFFSET); + } + + public String getEthernetDestination() { + return getEthernetAddress(PacketConstants.ETHER_DST_OFFSET); + } + + public int getSequenceNumber() { + if (isTcpPacket()) { + int sequenceOffset = PacketConstants.ETHER_HEADER_LENGTH + getIPHeaderLength() + getTCPHeaderLength(raw) + 4; + return Math.abs(convertInt(raw, sequenceOffset)); + } else { + return 0; + } + } + + public int getSrc_port() { + if (isPPPoV6Packet()) { + return getPort(64); + } + if (isIpV6Packet()) { + if (isRoutingV6) { + return getPort(136); + } + return getPort(40); + } + return getPort(0); + } + + public int getDst_port() { + if (isPPPoV6Packet()) { + return getPort(66); + } + if (isIpV6Packet()) { + if (isRoutingV6) { + return getPort(138); + } + return getPort(42); + } + return getPort(2); + } + + public byte[] getData() { + int payloadDataStart = getIPHeaderLength(); + if (isTcpPacket()) { + payloadDataStart += this.getTCPHeaderLength(raw); + } else if (isUdpPacket()) { + payloadDataStart += this.getUDPHeaderLength(); + } else { + return null; + } + byte[] data = null; + if (packetLength >= payloadDataStart) { + data = new byte[packetLength - payloadDataStart]; + System.arraycopy(raw, ipOffset + payloadDataStart, data, 0, data.length); + } + return data; + } + + private InetAddress getIPAddress(final boolean src) { + byte[] ipBuffer = getIpAddressBytes(src); + if (ipBuffer == null) { + return null; + } + try { + return InetAddress.getByAddress(ipBuffer); + } catch (UnknownHostException e) { + return null; + } + } + + private byte[] getIpAddressBytes(final boolean src) { + int srcPos; + byte[] ipBuffer; + if (isIpV4Packet()) { + ipBuffer = new byte[4]; + srcPos = src ? PacketConstants.IP4_SRC_OFFSET : PacketConstants.IP4_DST_OFFSET; + } else if (isIpV6Packet()) { + ipBuffer = new byte[16]; + if (isRoutingV6) { + srcPos = src ? PacketConstants.IP6_SRC_OFFSET + 96 : PacketConstants.IP6_DST_OFFSET + 96; + } else { + srcPos = src ? PacketConstants.IP6_SRC_OFFSET : PacketConstants.IP6_DST_OFFSET; + } + } else if (isPPPoV6Packet()) { + ipBuffer = new byte[16]; + srcPos = src ? PacketConstants.IP6_SRC_OFFSET + PacketConstants.PPPoV6_IP_OFFSET : PacketConstants.IP6_DST_OFFSET + PacketConstants.PPPoV6_IP_OFFSET; + } else { + return null; --- End diff -- What does a null value returned from here mean? An unsupported packet type? If so, caller should throw an UnsupportedXYZ exception, perhaps? > Want a memory format for PCAP files > ----------------------------------- > > Key: DRILL-5432 > URL: https://issues.apache.org/jira/browse/DRILL-5432 > Project: Apache Drill > Issue Type: New Feature > Reporter: Ted Dunning > > PCAP files [1] are the de facto standard for storing network capture data. In > security and protocol applications, it is very common to want to extract > particular packets from a capture for further analysis. > At a first level, it is desirable to query and filter by source and > destination IP and port or by protocol. Beyond that, however, it would be > very useful to be able to group packets by TCP session and eventually to look > at packet contents. For now, however, the most critical requirement is that > we should be able to scan captures at very high speed. > I previously wrote a (kind of working) proof of concept for a PCAP decoder > that did lazy deserialization and could traverse hundreds of MB of PCAP data > per second per core. This compares to roughly 2-3 MB/s for widely available > Apache-compatible open source PCAP decoders. > This JIRA covers the integration and extension of that proof of concept as a > Drill file format. > Initial work is available at https://github.com/mapr-demos/drill-pcap-format > [1] https://en.wikipedia.org/wiki/Pcap -- This message was sent by Atlassian JIRA (v6.3.15#6346)