DRILL-6191: Add acknowledgement sequence number and flags fields, details for flags
closes #1134 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/40894225 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/40894225 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/40894225 Branch: refs/heads/master Commit: 408942259800d9987f4e84b3cdbd47e29920e934 Parents: 4bd3cc2 Author: Ted Dunning <ted.dunn...@gmail.com> Authored: Tue Jan 2 16:20:35 2018 -0800 Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com> Committed: Sat Mar 3 19:47:43 2018 +0200 ---------------------------------------------------------------------- .../drill/exec/store/pcap/PcapDrillTable.java | 2 + .../drill/exec/store/pcap/PcapRecordReader.java | 75 ++++++++++++++ .../drill/exec/store/pcap/decoder/Packet.java | 102 ++++++++++++++++++- .../store/pcap/decoder/PacketConstants.java | 3 + .../drill/exec/store/pcap/schema/PcapTypes.java | 1 + .../drill/exec/store/pcap/schema/Schema.java | 14 +++ .../exec/store/pcap/TestPcapRecordReader.java | 40 ++++++-- .../src/test/resources/store/pcap/synscan.pcap | Bin 0 -> 148872 bytes 8 files changed, 224 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java index 2fbf67d..20e7e93 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java @@ -53,6 +53,8 @@ public class PcapDrillTable extends DrillTable { return typeFactory.createSqlType(SqlTypeName.BIGINT); case INTEGER: return typeFactory.createSqlType(SqlTypeName.INTEGER); + case BOOLEAN: + return typeFactory.createSqlType(SqlTypeName.BOOLEAN); case STRING: return typeFactory.createSqlType(SqlTypeName.VARCHAR); case TIMESTAMP: http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java index 26e1e65..d01b746 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java @@ -280,6 +280,76 @@ public class PcapRecordReader extends AbstractRecordReader { setIntegerColumnValue(packet.getSequenceNumber(), pci, count); } break; + case "tcp_ack": + if (packet.isTcpPacket()) { + setIntegerColumnValue(packet.getAckNumber(), pci, count); + } + break; + case "tcp_flags": + if (packet.isTcpPacket()) { + setIntegerColumnValue(packet.getFlags(), pci, count); + } + break; + case "tcp_parsed_flags": + if (packet.isTcpPacket()) { + setStringColumnValue(packet.getParsedFlags(), pci, count); + } + break; + case "tcp_flags_ns": + if (packet.isTcpPacket()) { + setBooleanColumnValue((packet.getFlags() & 0x100) != 0, pci, count); + } + break; + case "tcp_flags_cwr": + if (packet.isTcpPacket()) { + setBooleanColumnValue((packet.getFlags() & 0x80) != 0, pci, count); + } + break; + case "tcp_flags_ece ": + if (packet.isTcpPacket()) { + setBooleanColumnValue((packet.getFlags() & 0x40) != 0, pci, count); + } + break; + case "tcp_flags_ece_ecn_capable": + if (packet.isTcpPacket()) { + setBooleanColumnValue((packet.getFlags() & 0x42) == 0x42, pci, count); + } + break; + case "tcp_flags_ece_congestion_experienced": + if (packet.isTcpPacket()) { + setBooleanColumnValue((packet.getFlags() & 0x42) == 0x40, pci, count); + } + break; + case "tcp_flags_urg": + if (packet.isTcpPacket()) { + setBooleanColumnValue((packet.getFlags() & 0x20) != 0, pci, count); + } + break; + case "tcp_flags_ack": + if (packet.isTcpPacket()) { + setBooleanColumnValue((packet.getFlags() & 0x10) != 0, pci, count); + } + break; + case "tcp_flags_psh": + if (packet.isTcpPacket()) { + setBooleanColumnValue((packet.getFlags() & 0x8) != 0, pci, count); + } + break; + case "tcp_flags_rst": + if (packet.isTcpPacket()) { + setBooleanColumnValue((packet.getFlags() & 0x4) != 0, pci, count); + } + break; + case "tcp_flags_syn": + if (packet.isTcpPacket()) { + setBooleanColumnValue((packet.getFlags() & 0x2) != 0, pci, count); + } + break; + case "tcp_flags_fin": + if (packet.isTcpPacket()) { + setBooleanColumnValue((packet.getFlags() & 0x1) != 0, pci, count); + } + break; case "packet_length": setIntegerColumnValue(packet.getPacketLength(), pci, count); break; @@ -305,6 +375,11 @@ public class PcapRecordReader extends AbstractRecordReader { .setSafe(count, data); } + private void setBooleanColumnValue(final boolean data, final ProjectedColumnInfo pci, final int count) { + ((NullableIntVector.Mutator) pci.vv.getMutator()) + .setSafe(count, data ? 1 : 0); + } + private void setTimestampColumnValue(final long data, final ProjectedColumnInfo pci, final int count) { ((NullableTimeStampVector.Mutator) pci.vv.getMutator()) .setSafe(count, data); http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java index 0a45290..9cc98de 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Formatter; import static org.apache.drill.exec.store.pcap.PcapFormatUtils.convertInt; import static org.apache.drill.exec.store.pcap.PcapFormatUtils.convertShort; @@ -43,7 +44,9 @@ public class Packet { private byte[] raw; + // index into the raw data where the current ethernet packet starts private int etherOffset; + // index into the raw data where the current IP packet starts. Should be just after etherOffset private int ipOffset; private int packetLength; @@ -180,13 +183,104 @@ public class Packet { public int getSequenceNumber() { if (isTcpPacket()) { - int sequenceOffset = PacketConstants.ETHER_HEADER_LENGTH + getIPHeaderLength() + getTCPHeaderLength(raw) + 4; - return Math.abs(convertInt(raw, sequenceOffset)); + return convertInt(raw, ipOffset + getIPHeaderLength() + PacketConstants.TCP_SEQUENCE_OFFSET); } else { return 0; } } + public int getAckNumber() { + if (isTcpPacket()) { + return convertInt(raw, ipOffset + getIPHeaderLength() + PacketConstants.TCP_ACK_OFFSET); + } else { + return 0; + } + } + + public int getFlags() { + if (isTcpPacket()) { + return convertShort(raw, ipOffset + getIPHeaderLength() + PacketConstants.TCP_FLAG_OFFSET) & 0xfff; + } else { + return 0; + } + } + + public String getParsedFlags() { + return formatFlags(getFlags()); + } + + public static String formatFlags(int flags) { + int mask = 0x100; + StringBuilder r = new StringBuilder(); + String separator = ""; + if ((flags & mask) != 0) { + r.append(separator); + r.append("NS"); + separator = "|"; + } + mask = mask >> 1; + + if ((flags & mask) != 0) { + r.append(separator); + r.append("CWR"); + separator = "|"; + } + mask = mask >> 1; + + if ((flags & mask) != 0) { + r.append(separator); + r.append("ECE"); + if ((flags & 2) != 0) { + r.append(" (ECN capable)"); + } else { + r.append(" (Congestion experienced)"); + } + separator = "|"; + } + mask = mask >> 1; + + if ((flags & mask) != 0) { + r.append(separator); + r.append("URG"); + separator = "|"; + } + mask = mask >> 1; + + if ((flags & mask) != 0) { + r.append(separator); + r.append("ACK"); + separator = "|"; + } + mask = mask >> 1; + + if ((flags & mask) != 0) { + r.append(separator); + r.append("PSH"); + separator = "|"; + } + mask = mask >> 1; + + if ((flags & mask) != 0) { + r.append(separator); + r.append("RST"); + separator = "|"; + } + mask = mask >> 1; + + if ((flags & mask) != 0) { + r.append(separator); + r.append("SYN"); + separator = "|"; + } + mask = mask >> 1; + + if ((flags & mask) != 0) { + r.append(separator); + r.append("FIN"); + } + return r.toString(); + } + public int getSrc_port() { if (isPPPoV6Packet()) { return getPort(64); @@ -361,9 +455,9 @@ public class Packet { private String getEthernetAddress(int offset) { byte[] r = new byte[6]; System.arraycopy(raw, etherOffset + offset, r, 0, 6); - StringBuilder sb = new StringBuilder(); + Formatter sb = new Formatter(); for (int i = 0; i < r.length; i++) { - sb.append(String.format("%02X%s", r[i], (i < r.length - 1) ? ":" : "")); + sb.format("%02X%s", r[i], (i < r.length - 1) ? ":" : ""); } return sb.toString(); } http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java index 2c87623..6f29253 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java @@ -65,4 +65,7 @@ public final class PacketConstants { public static final int PPPoV6_IP_OFFSET = 28; + public static final int TCP_SEQUENCE_OFFSET = 4; + public static final int TCP_ACK_OFFSET = 8; + public static final int TCP_FLAG_OFFSET = 12; } http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java index 5c6df71..fc6e029 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.pcap.schema; public enum PcapTypes { + BOOLEAN, INTEGER, STRING, LONG, http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java index b3e7722..89bd08f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java @@ -41,6 +41,20 @@ public class Schema { columns.add(new ColumnDto("src_mac_address", PcapTypes.STRING)); columns.add(new ColumnDto("dst_mac_address", PcapTypes.STRING)); columns.add(new ColumnDto("tcp_session", PcapTypes.LONG)); + columns.add(new ColumnDto("tcp_ack", PcapTypes.INTEGER)); + columns.add(new ColumnDto("tcp_flags", PcapTypes.INTEGER)); + columns.add(new ColumnDto("tcp_flags_ns", PcapTypes.INTEGER)); + columns.add(new ColumnDto("tcp_flags_cwr", PcapTypes.INTEGER)); + columns.add(new ColumnDto("tcp_flags_ece ", PcapTypes.INTEGER )); + columns.add(new ColumnDto("tcp_flags_ece_ecn_capable", PcapTypes.INTEGER )); + columns.add(new ColumnDto("tcp_flags_ece_congestion_experienced", PcapTypes.INTEGER )); + columns.add(new ColumnDto("tcp_flags_urg", PcapTypes.INTEGER )); + columns.add(new ColumnDto("tcp_flags_ack", PcapTypes.INTEGER )); + columns.add(new ColumnDto("tcp_flags_psh", PcapTypes.INTEGER )); + columns.add(new ColumnDto("tcp_flags_rst", PcapTypes.INTEGER )); + columns.add(new ColumnDto("tcp_flags_syn", PcapTypes.INTEGER )); + columns.add(new ColumnDto("tcp_flags_fin", PcapTypes.INTEGER )); + columns.add(new ColumnDto("tcp_parsed_flags", PcapTypes.STRING)); columns.add(new ColumnDto("packet_length", PcapTypes.INTEGER)); columns.add(new ColumnDto("data", PcapTypes.STRING)); } http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java index bb81469..385c0e0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java @@ -16,16 +16,17 @@ */ package org.apache.drill.exec.store.pcap; +import org.apache.drill.exec.store.pcap.decoder.Packet; import org.apache.drill.test.BaseTestQuery; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.rpc.user.QueryDataBatch; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import java.nio.file.Paths; import java.util.List; +import static org.junit.Assert.assertEquals; + public class TestPcapRecordReader extends BaseTestQuery { @BeforeClass public static void setupTestFiles() { @@ -52,21 +53,42 @@ public class TestPcapRecordReader extends BaseTestQuery { runSQLVerifyCount("select distinct type, network, `timestamp`, src_ip, dst_ip, src_port, dst_port, src_mac_address, dst_mac_address, tcp_session, packet_length from dfs.`store/pcap/tcp-1.pcap`", 1); } + @Test + public void testFlagFormatting() { + assertEquals("NS", Packet.formatFlags(0x100)); + assertEquals("CWR", Packet.formatFlags(0x80)); + assertEquals("ECE", Packet.formatFlags(0x40).substring(0, 3)); + assertEquals("ECE", Packet.formatFlags(0x42).substring(0, 3)); + assertEquals("URG", Packet.formatFlags(0x20)); + assertEquals("ACK", Packet.formatFlags(0x10)); + assertEquals("PSH", Packet.formatFlags(0x8)); + assertEquals("RST", Packet.formatFlags(0x4)); + assertEquals("SYN", Packet.formatFlags(0x2)); + assertEquals("FIN", Packet.formatFlags(0x1)); + assertEquals("RST|SYN|FIN", Packet.formatFlags(0x7)); + } + + @Test + public void checkFlags() throws Exception { + runSQLVerifyCount("select tcp_session, tcp_ack, tcp_flags from dfs.`store/pcap/synscan.pcap`", 2011); + } + private void runSQLVerifyCount(String sql, int expectedRowCount) throws Exception { List<QueryDataBatch> results = runSQLWithResults(sql); - printResultAndVerifyRowCount(results, expectedRowCount); + verifyRowCount(results, expectedRowCount); } private List<QueryDataBatch> runSQLWithResults(String sql) throws Exception { return testSqlWithResults(sql); } - private void printResultAndVerifyRowCount(List<QueryDataBatch> results, - int expectedRowCount) throws SchemaChangeException { - setColumnWidth(35); - int rowCount = printResult(results); - if (expectedRowCount != -1) { - Assert.assertEquals(expectedRowCount, rowCount); + private void verifyRowCount(List<QueryDataBatch> results, int expectedRowCount) { + int count = 0; + for (final QueryDataBatch result : results) { + count += result.getHeader().getRowCount(); + result.release(); } + assertEquals(expectedRowCount, count); } + } http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/test/resources/store/pcap/synscan.pcap ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/store/pcap/synscan.pcap b/exec/java-exec/src/test/resources/store/pcap/synscan.pcap new file mode 100644 index 0000000..8c2ca36 Binary files /dev/null and b/exec/java-exec/src/test/resources/store/pcap/synscan.pcap differ