DRILL-6191: Add acknowledgement sequence number and flags fields, details for 
flags

closes #1134


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/40894225
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/40894225
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/40894225

Branch: refs/heads/master
Commit: 408942259800d9987f4e84b3cdbd47e29920e934
Parents: 4bd3cc2
Author: Ted Dunning <ted.dunn...@gmail.com>
Authored: Tue Jan 2 16:20:35 2018 -0800
Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com>
Committed: Sat Mar 3 19:47:43 2018 +0200

----------------------------------------------------------------------
 .../drill/exec/store/pcap/PcapDrillTable.java   |   2 +
 .../drill/exec/store/pcap/PcapRecordReader.java |  75 ++++++++++++++
 .../drill/exec/store/pcap/decoder/Packet.java   | 102 ++++++++++++++++++-
 .../store/pcap/decoder/PacketConstants.java     |   3 +
 .../drill/exec/store/pcap/schema/PcapTypes.java |   1 +
 .../drill/exec/store/pcap/schema/Schema.java    |  14 +++
 .../exec/store/pcap/TestPcapRecordReader.java   |  40 ++++++--
 .../src/test/resources/store/pcap/synscan.pcap  | Bin 0 -> 148872 bytes
 8 files changed, 224 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java
index 2fbf67d..20e7e93 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java
@@ -53,6 +53,8 @@ public class PcapDrillTable extends DrillTable {
         return typeFactory.createSqlType(SqlTypeName.BIGINT);
       case INTEGER:
         return typeFactory.createSqlType(SqlTypeName.INTEGER);
+      case BOOLEAN:
+        return typeFactory.createSqlType(SqlTypeName.BOOLEAN);
       case STRING:
         return typeFactory.createSqlType(SqlTypeName.VARCHAR);
       case TIMESTAMP:

http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
index 26e1e65..d01b746 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
@@ -280,6 +280,76 @@ public class PcapRecordReader extends AbstractRecordReader 
{
             setIntegerColumnValue(packet.getSequenceNumber(), pci, count);
           }
           break;
+        case "tcp_ack":
+          if (packet.isTcpPacket()) {
+            setIntegerColumnValue(packet.getAckNumber(), pci, count);
+          }
+          break;
+        case "tcp_flags":
+          if (packet.isTcpPacket()) {
+            setIntegerColumnValue(packet.getFlags(), pci, count);
+          }
+          break;
+        case "tcp_parsed_flags":
+          if (packet.isTcpPacket()) {
+            setStringColumnValue(packet.getParsedFlags(), pci, count);
+          }
+          break;
+        case "tcp_flags_ns":
+          if (packet.isTcpPacket()) {
+            setBooleanColumnValue((packet.getFlags() & 0x100) != 0, pci, 
count);
+          }
+          break;
+        case "tcp_flags_cwr":
+          if (packet.isTcpPacket()) {
+            setBooleanColumnValue((packet.getFlags() & 0x80) != 0, pci, count);
+          }
+          break;
+        case "tcp_flags_ece ":
+          if (packet.isTcpPacket()) {
+            setBooleanColumnValue((packet.getFlags() & 0x40) != 0, pci, count);
+          }
+          break;
+        case "tcp_flags_ece_ecn_capable":
+          if (packet.isTcpPacket()) {
+            setBooleanColumnValue((packet.getFlags() & 0x42) == 0x42, pci, 
count);
+          }
+          break;
+        case "tcp_flags_ece_congestion_experienced":
+          if (packet.isTcpPacket()) {
+            setBooleanColumnValue((packet.getFlags() & 0x42) == 0x40, pci, 
count);
+          }
+          break;
+        case "tcp_flags_urg":
+          if (packet.isTcpPacket()) {
+            setBooleanColumnValue((packet.getFlags() & 0x20) != 0, pci, count);
+          }
+          break;
+        case "tcp_flags_ack":
+          if (packet.isTcpPacket()) {
+            setBooleanColumnValue((packet.getFlags() & 0x10) != 0, pci, count);
+          }
+          break;
+        case "tcp_flags_psh":
+          if (packet.isTcpPacket()) {
+            setBooleanColumnValue((packet.getFlags() & 0x8) != 0, pci, count);
+          }
+          break;
+        case "tcp_flags_rst":
+          if (packet.isTcpPacket()) {
+            setBooleanColumnValue((packet.getFlags() & 0x4) != 0, pci, count);
+          }
+          break;
+        case "tcp_flags_syn":
+          if (packet.isTcpPacket()) {
+            setBooleanColumnValue((packet.getFlags() & 0x2) != 0, pci, count);
+          }
+          break;
+        case "tcp_flags_fin":
+          if (packet.isTcpPacket()) {
+            setBooleanColumnValue((packet.getFlags() & 0x1) != 0, pci, count);
+          }
+          break;
         case "packet_length":
           setIntegerColumnValue(packet.getPacketLength(), pci, count);
           break;
@@ -305,6 +375,11 @@ public class PcapRecordReader extends AbstractRecordReader 
{
         .setSafe(count, data);
   }
 
+  private void setBooleanColumnValue(final boolean data, final 
ProjectedColumnInfo pci, final int count) {
+    ((NullableIntVector.Mutator) pci.vv.getMutator())
+        .setSafe(count, data ? 1 : 0);
+  }
+
   private void setTimestampColumnValue(final long data, final 
ProjectedColumnInfo pci, final int count) {
     ((NullableTimeStampVector.Mutator) pci.vv.getMutator())
         .setSafe(count, data);

http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
index 0a45290..9cc98de 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Formatter;
 
 import static org.apache.drill.exec.store.pcap.PcapFormatUtils.convertInt;
 import static org.apache.drill.exec.store.pcap.PcapFormatUtils.convertShort;
@@ -43,7 +44,9 @@ public class Packet {
 
   private byte[] raw;
 
+  // index into the raw data where the current ethernet packet starts
   private int etherOffset;
+  // index into the raw data where the current IP packet starts. Should be 
just after etherOffset
   private int ipOffset;
 
   private int packetLength;
@@ -180,13 +183,104 @@ public class Packet {
 
   public int getSequenceNumber() {
     if (isTcpPacket()) {
-      int sequenceOffset = PacketConstants.ETHER_HEADER_LENGTH + 
getIPHeaderLength() + getTCPHeaderLength(raw) + 4;
-      return Math.abs(convertInt(raw, sequenceOffset));
+      return convertInt(raw, ipOffset + getIPHeaderLength() + 
PacketConstants.TCP_SEQUENCE_OFFSET);
     } else {
       return 0;
     }
   }
 
+  public int getAckNumber() {
+    if (isTcpPacket()) {
+        return convertInt(raw, ipOffset + getIPHeaderLength() + 
PacketConstants.TCP_ACK_OFFSET);
+    } else {
+      return 0;
+    }
+  }
+
+  public int getFlags() {
+    if (isTcpPacket()) {
+      return convertShort(raw, ipOffset + getIPHeaderLength() + 
PacketConstants.TCP_FLAG_OFFSET) & 0xfff;
+    } else {
+      return 0;
+    }
+  }
+
+  public String getParsedFlags() {
+    return formatFlags(getFlags());
+  }
+
+  public static String formatFlags(int flags) {
+    int mask = 0x100;
+    StringBuilder r = new StringBuilder();
+    String separator = "";
+    if ((flags & mask) != 0) {
+      r.append(separator);
+      r.append("NS");
+      separator = "|";
+    }
+    mask = mask >> 1;
+
+    if ((flags & mask) != 0) {
+      r.append(separator);
+      r.append("CWR");
+      separator = "|";
+    }
+    mask = mask >> 1;
+
+    if ((flags & mask) != 0) {
+      r.append(separator);
+      r.append("ECE");
+      if ((flags & 2) != 0) {
+        r.append(" (ECN capable)");
+      } else {
+        r.append(" (Congestion experienced)");
+      }
+      separator = "|";
+    }
+    mask = mask >> 1;
+
+    if ((flags & mask) != 0) {
+      r.append(separator);
+      r.append("URG");
+      separator = "|";
+    }
+    mask = mask >> 1;
+
+    if ((flags & mask) != 0) {
+      r.append(separator);
+      r.append("ACK");
+      separator = "|";
+    }
+    mask = mask >> 1;
+
+    if ((flags & mask) != 0) {
+      r.append(separator);
+      r.append("PSH");
+      separator = "|";
+    }
+    mask = mask >> 1;
+
+    if ((flags & mask) != 0) {
+      r.append(separator);
+      r.append("RST");
+      separator = "|";
+    }
+    mask = mask >> 1;
+
+    if ((flags & mask) != 0) {
+      r.append(separator);
+      r.append("SYN");
+      separator = "|";
+    }
+    mask = mask >> 1;
+
+    if ((flags & mask) != 0) {
+      r.append(separator);
+      r.append("FIN");
+    }
+    return r.toString();
+  }
+
   public int getSrc_port() {
     if (isPPPoV6Packet()) {
       return getPort(64);
@@ -361,9 +455,9 @@ public class Packet {
   private String getEthernetAddress(int offset) {
     byte[] r = new byte[6];
     System.arraycopy(raw, etherOffset + offset, r, 0, 6);
-    StringBuilder sb = new StringBuilder();
+    Formatter sb = new Formatter();
     for (int i = 0; i < r.length; i++) {
-      sb.append(String.format("%02X%s", r[i], (i < r.length - 1) ? ":" : ""));
+      sb.format("%02X%s", r[i], (i < r.length - 1) ? ":" : "");
     }
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
index 2c87623..6f29253 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
@@ -65,4 +65,7 @@ public final class PacketConstants {
 
   public static final int PPPoV6_IP_OFFSET = 28;
 
+  public static final int TCP_SEQUENCE_OFFSET = 4;
+  public static final int TCP_ACK_OFFSET = 8;
+  public static final int TCP_FLAG_OFFSET = 12;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
index 5c6df71..fc6e029 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.pcap.schema;
 
 public enum PcapTypes {
+  BOOLEAN,
   INTEGER,
   STRING,
   LONG,

http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
index b3e7722..89bd08f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
@@ -41,6 +41,20 @@ public class Schema {
     columns.add(new ColumnDto("src_mac_address", PcapTypes.STRING));
     columns.add(new ColumnDto("dst_mac_address", PcapTypes.STRING));
     columns.add(new ColumnDto("tcp_session", PcapTypes.LONG));
+    columns.add(new ColumnDto("tcp_ack", PcapTypes.INTEGER));
+    columns.add(new ColumnDto("tcp_flags", PcapTypes.INTEGER));
+    columns.add(new ColumnDto("tcp_flags_ns", PcapTypes.INTEGER));
+    columns.add(new ColumnDto("tcp_flags_cwr", PcapTypes.INTEGER));
+    columns.add(new ColumnDto("tcp_flags_ece ", PcapTypes.INTEGER ));
+    columns.add(new ColumnDto("tcp_flags_ece_ecn_capable", PcapTypes.INTEGER 
));
+    columns.add(new ColumnDto("tcp_flags_ece_congestion_experienced", 
PcapTypes.INTEGER ));
+    columns.add(new ColumnDto("tcp_flags_urg", PcapTypes.INTEGER ));
+    columns.add(new ColumnDto("tcp_flags_ack", PcapTypes.INTEGER ));
+    columns.add(new ColumnDto("tcp_flags_psh", PcapTypes.INTEGER ));
+    columns.add(new ColumnDto("tcp_flags_rst", PcapTypes.INTEGER ));
+    columns.add(new ColumnDto("tcp_flags_syn", PcapTypes.INTEGER ));
+    columns.add(new ColumnDto("tcp_flags_fin", PcapTypes.INTEGER ));
+    columns.add(new ColumnDto("tcp_parsed_flags", PcapTypes.STRING));
     columns.add(new ColumnDto("packet_length", PcapTypes.INTEGER));
     columns.add(new ColumnDto("data", PcapTypes.STRING));
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
index bb81469..385c0e0 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
@@ -16,16 +16,17 @@
  */
 package org.apache.drill.exec.store.pcap;
 
+import org.apache.drill.exec.store.pcap.decoder.Packet;
 import org.apache.drill.test.BaseTestQuery;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.nio.file.Paths;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+
 public class TestPcapRecordReader extends BaseTestQuery {
   @BeforeClass
   public static void setupTestFiles() {
@@ -52,21 +53,42 @@ public class TestPcapRecordReader extends BaseTestQuery {
     runSQLVerifyCount("select distinct type, network, `timestamp`, src_ip, 
dst_ip, src_port, dst_port, src_mac_address, dst_mac_address, tcp_session, 
packet_length from dfs.`store/pcap/tcp-1.pcap`", 1);
   }
 
+  @Test
+  public void testFlagFormatting() {
+    assertEquals("NS", Packet.formatFlags(0x100));
+    assertEquals("CWR", Packet.formatFlags(0x80));
+    assertEquals("ECE", Packet.formatFlags(0x40).substring(0, 3));
+    assertEquals("ECE", Packet.formatFlags(0x42).substring(0, 3));
+    assertEquals("URG", Packet.formatFlags(0x20));
+    assertEquals("ACK", Packet.formatFlags(0x10));
+    assertEquals("PSH", Packet.formatFlags(0x8));
+    assertEquals("RST", Packet.formatFlags(0x4));
+    assertEquals("SYN", Packet.formatFlags(0x2));
+    assertEquals("FIN", Packet.formatFlags(0x1));
+    assertEquals("RST|SYN|FIN", Packet.formatFlags(0x7));
+  }
+
+  @Test
+  public void checkFlags() throws Exception {
+    runSQLVerifyCount("select tcp_session, tcp_ack, tcp_flags from 
dfs.`store/pcap/synscan.pcap`", 2011);
+  }
+
   private void runSQLVerifyCount(String sql, int expectedRowCount) throws 
Exception {
     List<QueryDataBatch> results = runSQLWithResults(sql);
-    printResultAndVerifyRowCount(results, expectedRowCount);
+    verifyRowCount(results, expectedRowCount);
   }
 
   private List<QueryDataBatch> runSQLWithResults(String sql) throws Exception {
     return testSqlWithResults(sql);
   }
 
-  private void printResultAndVerifyRowCount(List<QueryDataBatch> results,
-                                            int expectedRowCount) throws 
SchemaChangeException {
-    setColumnWidth(35);
-    int rowCount = printResult(results);
-    if (expectedRowCount != -1) {
-      Assert.assertEquals(expectedRowCount, rowCount);
+  private void verifyRowCount(List<QueryDataBatch> results, int 
expectedRowCount) {
+    int count = 0;
+    for (final QueryDataBatch result : results) {
+      count += result.getHeader().getRowCount();
+      result.release();
     }
+    assertEquals(expectedRowCount, count);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40894225/exec/java-exec/src/test/resources/store/pcap/synscan.pcap
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/pcap/synscan.pcap 
b/exec/java-exec/src/test/resources/store/pcap/synscan.pcap
new file mode 100644
index 0000000..8c2ca36
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/store/pcap/synscan.pcap differ

Reply via email to