http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/paloalto/BasicPaloAltoFirewallParserTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/paloalto/BasicPaloAltoFirewallParserTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/paloalto/BasicPaloAltoFirewallParserTest.java
index 2c90b1e..cc6191c 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/paloalto/BasicPaloAltoFirewallParserTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/paloalto/BasicPaloAltoFirewallParserTest.java
@@ -18,6 +18,7 @@
 package org.apache.metron.parsers.paloalto;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 import org.apache.metron.parsers.AbstractParserConfigTest;
 import org.json.simple.JSONObject;
@@ -25,6 +26,8 @@ import org.json.simple.parser.ParseException;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.List;
+
 public class BasicPaloAltoFirewallParserTest extends AbstractParserConfigTest {
 
   @Before
@@ -32,6 +35,221 @@ public class BasicPaloAltoFirewallParserTest extends 
AbstractParserConfigTest {
     parser = new BasicPaloAltoFirewallParser();
   }
 
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParseSystem61() throws ParseException {
+    final String SYSTEM_61 = "1,2017/08/11 
12:37:58,008900008659,SYSTEM,general,1,2017/08/11 
11:37:58,vsys1,eventId_test,object_test,Futureuse1_test,futureuse2_test,management,high,Description_test,1354,0x0";
+
+    JSONObject actual = parser.parse(SYSTEM_61.getBytes()).get(0);
+
+    JSONObject expected = new JSONObject();
+    expected.put(BasicPaloAltoFirewallParser.PaloAltoDomain, "1");
+    expected.put(BasicPaloAltoFirewallParser.ReceiveTime, "2017/08/11 
12:37:58");
+    expected.put(BasicPaloAltoFirewallParser.SerialNum, "008900008659");
+    expected.put(BasicPaloAltoFirewallParser.Type, "SYSTEM");
+    expected.put(BasicPaloAltoFirewallParser.ThreatContentType, "general");
+    expected.put(BasicPaloAltoFirewallParser.ConfigVersion, "1");
+    expected.put(BasicPaloAltoFirewallParser.GenerateTime, "2017/08/11 
11:37:58");
+    expected.put(BasicPaloAltoFirewallParser.VirtualSystem, "vsys1");
+    expected.put(BasicPaloAltoFirewallParser.EventId, "eventId_test");
+    expected.put(BasicPaloAltoFirewallParser.Object, "object_test");
+    expected.put(BasicPaloAltoFirewallParser.Module, "management");
+    expected.put(BasicPaloAltoFirewallParser.Severity, "high");
+    expected.put(BasicPaloAltoFirewallParser.Description, "Description_test");
+    expected.put(BasicPaloAltoFirewallParser.Seqno, "1354");
+    expected.put(BasicPaloAltoFirewallParser.ActionFlags, "0x0");
+    expected.put(BasicPaloAltoFirewallParser.ParserVersion, 61);
+    expected.put("original_string", SYSTEM_61);
+    expected.put("timestamp", actual.get("timestamp"));
+
+    assertEquals(expected, actual);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParseSystem80() throws ParseException {
+    final String SYSTEM_80 = "1,2017/08/11 
12:37:58,008900008659,SYSTEM,general,1,2017/08/11 
11:37:58,vsys1,eventId_test,object_test,Futureuse1_test,futureuse2_test,management,high,Description_test,1354,0x0,12,34,45,0,virSys1,dev-something200-01";
+
+    JSONObject actual = parser.parse(SYSTEM_80.getBytes()).get(0);
+
+    JSONObject expected = new JSONObject();
+    expected.put(BasicPaloAltoFirewallParser.PaloAltoDomain, "1");
+    expected.put(BasicPaloAltoFirewallParser.ReceiveTime, "2017/08/11 
12:37:58");
+    expected.put(BasicPaloAltoFirewallParser.SerialNum, "008900008659");
+    expected.put(BasicPaloAltoFirewallParser.Type, "SYSTEM");
+    expected.put(BasicPaloAltoFirewallParser.ThreatContentType, "general");
+    expected.put(BasicPaloAltoFirewallParser.ConfigVersion, "1");
+    expected.put(BasicPaloAltoFirewallParser.GenerateTime, "2017/08/11 
11:37:58");
+    expected.put(BasicPaloAltoFirewallParser.VirtualSystem, "vsys1");
+    expected.put(BasicPaloAltoFirewallParser.EventId, "eventId_test");
+    expected.put(BasicPaloAltoFirewallParser.Object, "object_test");
+    expected.put(BasicPaloAltoFirewallParser.Module, "management");
+    expected.put(BasicPaloAltoFirewallParser.Severity, "high");
+    expected.put(BasicPaloAltoFirewallParser.Description, "Description_test");
+    expected.put(BasicPaloAltoFirewallParser.Seqno, "1354");
+    expected.put(BasicPaloAltoFirewallParser.ActionFlags, "0x0");
+    expected.put(BasicPaloAltoFirewallParser.DGH1, "12");
+    expected.put(BasicPaloAltoFirewallParser.DGH2, "34");
+    expected.put(BasicPaloAltoFirewallParser.DGH3, "45");
+    expected.put(BasicPaloAltoFirewallParser.DGH4, "0");
+    expected.put(BasicPaloAltoFirewallParser.VSYSName, "virSys1");
+    expected.put(BasicPaloAltoFirewallParser.DeviceName, 
"dev-something200-01");
+
+    expected.put(BasicPaloAltoFirewallParser.ParserVersion, 80);
+    expected.put("original_string", SYSTEM_80);
+    expected.put("timestamp", actual.get("timestamp"));
+
+    assertEquals(expected, actual);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParseConfig61NoCustomFields() throws ParseException {
+    final String CONFIG_61_customFields = "1,2017/08/11 
12:37:58,008900008659,CONFIG,0,1,2017/08/11 
11:37:58,192.168.14.162,vsys1,edit,admin,Web,Succeeded, config shared 
log-settings config,1354,0x0";
+
+    JSONObject actual = parser.parse(CONFIG_61_customFields.getBytes()).get(0);
+
+    JSONObject expected = new JSONObject();
+    expected.put(BasicPaloAltoFirewallParser.PaloAltoDomain, "1");
+    expected.put(BasicPaloAltoFirewallParser.ReceiveTime, "2017/08/11 
12:37:58");
+    expected.put(BasicPaloAltoFirewallParser.SerialNum, "008900008659");
+    expected.put(BasicPaloAltoFirewallParser.Type, "CONFIG");
+    expected.put(BasicPaloAltoFirewallParser.ThreatContentType, "0");
+    expected.put(BasicPaloAltoFirewallParser.ConfigVersion, "1");
+    expected.put(BasicPaloAltoFirewallParser.GenerateTime, "2017/08/11 
11:37:58");
+
+    expected.put(BasicPaloAltoFirewallParser.HOST, "192.168.14.162");
+    expected.put(BasicPaloAltoFirewallParser.VirtualSystem, "vsys1");
+    expected.put(BasicPaloAltoFirewallParser.Command, "edit");
+    expected.put(BasicPaloAltoFirewallParser.Admin, "admin");
+    expected.put(BasicPaloAltoFirewallParser.Client, "Web");
+    expected.put(BasicPaloAltoFirewallParser.Result, "Succeeded");
+    expected.put(BasicPaloAltoFirewallParser.ConfigurationPath, "config shared 
log-settings config");
+    expected.put(BasicPaloAltoFirewallParser.Seqno, "1354");
+    expected.put(BasicPaloAltoFirewallParser.ActionFlags, "0x0");
+
+    expected.put(BasicPaloAltoFirewallParser.ParserVersion, 61);
+    expected.put("original_string", CONFIG_61_customFields);
+    expected.put("timestamp", actual.get("timestamp"));
+
+    assertEquals(expected, actual);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParseConfig61CustomFields() throws ParseException {
+    final String CONFIG_61_noCustomFields = "1,2017/08/11 
12:37:58,008900008659,CONFIG,0,1,2017/08/11 
11:37:58,192.168.14.162,vsys1,edit,admin,Web,Succeeded, config shared 
log-settings 
config,1354,0x0,/FatherNode/KidNode/GrandsonNode1,/FatherNode/KidNode/GrandsonNode2";
+
+    JSONObject actual = 
parser.parse(CONFIG_61_noCustomFields.getBytes()).get(0);
+
+    JSONObject expected = new JSONObject();
+    expected.put(BasicPaloAltoFirewallParser.PaloAltoDomain, "1");
+    expected.put(BasicPaloAltoFirewallParser.ReceiveTime, "2017/08/11 
12:37:58");
+    expected.put(BasicPaloAltoFirewallParser.SerialNum, "008900008659");
+    expected.put(BasicPaloAltoFirewallParser.Type, "CONFIG");
+    expected.put(BasicPaloAltoFirewallParser.ThreatContentType, "0");
+    expected.put(BasicPaloAltoFirewallParser.ConfigVersion, "1");
+    expected.put(BasicPaloAltoFirewallParser.GenerateTime, "2017/08/11 
11:37:58");
+
+    expected.put(BasicPaloAltoFirewallParser.HOST, "192.168.14.162");
+    expected.put(BasicPaloAltoFirewallParser.VirtualSystem, "vsys1");
+    expected.put(BasicPaloAltoFirewallParser.Command, "edit");
+    expected.put(BasicPaloAltoFirewallParser.Admin, "admin");
+    expected.put(BasicPaloAltoFirewallParser.Client, "Web");
+    expected.put(BasicPaloAltoFirewallParser.Result, "Succeeded");
+    expected.put(BasicPaloAltoFirewallParser.ConfigurationPath, "config shared 
log-settings config");
+    expected.put(BasicPaloAltoFirewallParser.Seqno, "1354");
+    expected.put(BasicPaloAltoFirewallParser.ActionFlags, "0x0");
+    expected.put(BasicPaloAltoFirewallParser.BeforeChangeDetail, 
"/FatherNode/KidNode/GrandsonNode1");
+    expected.put(BasicPaloAltoFirewallParser.AfterChangeDetail, 
"/FatherNode/KidNode/GrandsonNode2");
+
+    expected.put(BasicPaloAltoFirewallParser.ParserVersion, 61);
+    expected.put("original_string", CONFIG_61_noCustomFields);
+    expected.put("timestamp", actual.get("timestamp"));
+
+    assertEquals(expected, actual);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParseConfig70And80NoCustomFields() throws ParseException {
+    final String CONFIG_70_80_noCustomFields = "1,2017/08/11 
12:37:58,008900008659,CONFIG,0,1,2017/08/11 
11:37:58,192.168.14.162,vsys1,edit,admin,Web,Succeeded, config shared 
log-settings config,1354,0x0,12,34,45,0,virSys1,dev-something200-01";
+
+    JSONObject actual = 
parser.parse(CONFIG_70_80_noCustomFields.getBytes()).get(0);
+
+    JSONObject expected = new JSONObject();
+    expected.put(BasicPaloAltoFirewallParser.PaloAltoDomain, "1");
+    expected.put(BasicPaloAltoFirewallParser.ReceiveTime, "2017/08/11 
12:37:58");
+    expected.put(BasicPaloAltoFirewallParser.SerialNum, "008900008659");
+    expected.put(BasicPaloAltoFirewallParser.Type, "CONFIG");
+    expected.put(BasicPaloAltoFirewallParser.ThreatContentType, "0");
+    expected.put(BasicPaloAltoFirewallParser.ConfigVersion, "1");
+    expected.put(BasicPaloAltoFirewallParser.GenerateTime, "2017/08/11 
11:37:58");
+
+    expected.put(BasicPaloAltoFirewallParser.HOST, "192.168.14.162");
+    expected.put(BasicPaloAltoFirewallParser.VirtualSystem, "vsys1");
+    expected.put(BasicPaloAltoFirewallParser.Command, "edit");
+    expected.put(BasicPaloAltoFirewallParser.Admin, "admin");
+    expected.put(BasicPaloAltoFirewallParser.Client, "Web");
+    expected.put(BasicPaloAltoFirewallParser.Result, "Succeeded");
+    expected.put(BasicPaloAltoFirewallParser.ConfigurationPath, "config shared 
log-settings config");
+    expected.put(BasicPaloAltoFirewallParser.Seqno, "1354");
+    expected.put(BasicPaloAltoFirewallParser.ActionFlags, "0x0");
+    expected.put(BasicPaloAltoFirewallParser.DGH1, "12");
+    expected.put(BasicPaloAltoFirewallParser.DGH2, "34");
+    expected.put(BasicPaloAltoFirewallParser.DGH3, "45");
+    expected.put(BasicPaloAltoFirewallParser.DGH4, "0");
+    expected.put(BasicPaloAltoFirewallParser.VSYSName, "virSys1");
+    expected.put(BasicPaloAltoFirewallParser.DeviceName, 
"dev-something200-01");
+
+    expected.put(BasicPaloAltoFirewallParser.ParserVersion, 80);
+    expected.put("original_string", CONFIG_70_80_noCustomFields);
+    expected.put("timestamp", actual.get("timestamp"));
+
+    assertEquals(expected, actual);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParseConfig70And80CustomFields() throws ParseException {
+    final String CONFIG_70_80_customFields = "1,2017/08/11 
12:37:58,008900008659,CONFIG,0,1,2017/08/11 
11:37:58,192.168.14.162,vsys1,edit,admin,Web,Succeeded,config shared 
log-settings 
config,/FatherNode/KidNode/GrandsonNode1,/FatherNode/KidNode/GrandsonNode2,1354,0x0,12,34,45,0,virSys1,dev-something200-01";
+
+    JSONObject actual = 
parser.parse(CONFIG_70_80_customFields.getBytes()).get(0);
+
+    JSONObject expected = new JSONObject();
+    expected.put(BasicPaloAltoFirewallParser.PaloAltoDomain, "1");
+    expected.put(BasicPaloAltoFirewallParser.ReceiveTime, "2017/08/11 
12:37:58");
+    expected.put(BasicPaloAltoFirewallParser.SerialNum, "008900008659");
+    expected.put(BasicPaloAltoFirewallParser.Type, "CONFIG");
+    expected.put(BasicPaloAltoFirewallParser.ThreatContentType, "0");
+    expected.put(BasicPaloAltoFirewallParser.ConfigVersion, "1");
+    expected.put(BasicPaloAltoFirewallParser.GenerateTime, "2017/08/11 
11:37:58");
+
+    expected.put(BasicPaloAltoFirewallParser.HOST, "192.168.14.162");
+    expected.put(BasicPaloAltoFirewallParser.VirtualSystem, "vsys1");
+    expected.put(BasicPaloAltoFirewallParser.Command, "edit");
+    expected.put(BasicPaloAltoFirewallParser.Admin, "admin");
+    expected.put(BasicPaloAltoFirewallParser.Client, "Web");
+    expected.put(BasicPaloAltoFirewallParser.Result, "Succeeded");
+    expected.put(BasicPaloAltoFirewallParser.ConfigurationPath, "config shared 
log-settings config");
+    expected.put(BasicPaloAltoFirewallParser.BeforeChangeDetail, 
"/FatherNode/KidNode/GrandsonNode1");
+    expected.put(BasicPaloAltoFirewallParser.AfterChangeDetail, 
"/FatherNode/KidNode/GrandsonNode2");
+    expected.put(BasicPaloAltoFirewallParser.Seqno, "1354");
+    expected.put(BasicPaloAltoFirewallParser.ActionFlags, "0x0");
+    expected.put(BasicPaloAltoFirewallParser.DGH1, "12");
+    expected.put(BasicPaloAltoFirewallParser.DGH2, "34");
+    expected.put(BasicPaloAltoFirewallParser.DGH3, "45");
+    expected.put(BasicPaloAltoFirewallParser.DGH4, "0");
+    expected.put(BasicPaloAltoFirewallParser.VSYSName, "virSys1");
+    expected.put(BasicPaloAltoFirewallParser.DeviceName, 
"dev-something200-01");
+
+    expected.put(BasicPaloAltoFirewallParser.ParserVersion, 80);
+    expected.put("original_string", CONFIG_70_80_customFields);
+    expected.put("timestamp", actual.get("timestamp"));
+
+    assertEquals(expected, actual);
+  }
+
   public static final String THREAT_60 = "1,2015/01/05 
05:38:58,0006C110285,THREAT,vulnerability,1,2015/01/05 
05:38:58,10.0.0.115,216.0.10.198,0.0.0.0,0.0.0.0,EX-Allow,example\\user.name,,web-browsing,vsys1,internal,external,ethernet1/2,ethernet1/1,LOG-Default,2015/01/05
 
05:38:58,12031,1,54180,80,0,0,0x80004000,tcp,reset-both,\"ad.aspx?f=300x250&id=12;tile=1;ord=67AF705D60B1119C0F18BEA336F9\",HTTP:
 IIS Denial Of Service 
Attempt(40019),any,high,client-to-server,347368099,0x0,10.0.0.0-10.255.255.255,US,0,,1200568889751109656,,";
 
   @SuppressWarnings("unchecked")
@@ -88,6 +306,7 @@ public class BasicPaloAltoFirewallParserTest extends 
AbstractParserConfigTest {
   }
 
   public static final String TRAFFIC_60 = "1,2015/01/05 
12:51:33,0011C103117,TRAFFIC,end,1,2015/01/05 
12:51:33,10.0.0.39,10.1.0.163,0.0.0.0,0.0.0.0,EX-Allow,,example\\\\user.name,ms-ds-smb,vsys1,v_external,v_internal,ethernet1/2,ethernet1/1,LOG-Default,2015/01/05
 12:51:33,33760927,1,52688,445,0,0,0x401a,tcp,allow,2229,1287,942,10,2015/01/05 
12:51:01,30,any,0,17754932062,0x0,10.0.0.0-10.255.255.255,10.0.0.0-10.255.255.255,0,6,";
+
   @SuppressWarnings("unchecked")
   @Test
   public void testParseTraffic60() throws ParseException {
@@ -142,6 +361,7 @@ public class BasicPaloAltoFirewallParserTest extends 
AbstractParserConfigTest {
   }
 
   public static final String THREAT_70 = "1,2017/05/24 
09:53:10,001801000001,THREAT,virus,0,2017/05/24 
09:53:10,217.1.2.3,10.1.8.7,217.1.2.3,214.123.1.2,WLAN-Internet,,user,web-browsing,vsys1,Untrust,wifi_zone,ethernet1/1,vlan.1,Std-Log-Forward,2017/05/24
 
09:53:10,49567,1,80,51787,80,25025,0x400000,tcp,reset-both,\"abcdef310.exe\",Virus/Win32.WGeneric.lumeo(2457399),computer-and-internet-info,medium,server-to-client,329423829,0x0,DE,10.0.0.0-10.255.255.255,0,,0,,,1,,,\"\",\"\",,,,0,19,0,0,0,,PAN1,";
+
   @SuppressWarnings("unchecked")
   @Test
   public void testParseThreat70() throws ParseException {
@@ -202,6 +422,7 @@ public class BasicPaloAltoFirewallParserTest extends 
AbstractParserConfigTest {
   }
 
   public static final String TRAFFIC_70 = "1,2017/05/25 
21:38:13,001606000003,TRAFFIC,drop,1,2017/05/25 
21:38:13,10.2.1.8,192.168.1.10,0.0.0.0,0.0.0.0,DropLog,,,not-applicable,vsys1,intern,VPN,vlan.1,,Std-Log-Forward,2017/05/25
 21:38:13,0,1,137,137,0,0,0x0,udp,deny,114,114,0,1,2017/05/25 
21:38:12,0,any,0,9953744,0x0,192.168.0.0-192.168.255.255,DE,0,1,0,policy-deny,19,0,0,0,,PAN1,from-policy";
+
   @SuppressWarnings("unchecked")
   @Test
   public void testParseTraffic70() throws ParseException {
@@ -262,6 +483,7 @@ public class BasicPaloAltoFirewallParserTest extends 
AbstractParserConfigTest {
   }
 
   public static final String TRAFFIC_71 = "1,2017/05/31 
23:59:57,0006C000005,TRAFFIC,drop,0,2017/05/31 
23:59:57,185.94.1.1,201.1.4.5,0.0.0.0,0.0.0.0,DropLog,,,not-applicable,vsys1,untrust,untrust,vlan.1,,Standard-Syslog,2017/05/31
 23:59:57,0,1,59836,123,0,0,0x0,udp,deny,60,60,0,1,2017/05/31 
23:59:57,0,any,0,3433072193,0x0,RU,DE,0,1,0,policy-deny,16,11,0,0,,PAN1,from-policy";
+
   @SuppressWarnings("unchecked")
   @Test
   public void testParseTraffic71() throws ParseException {
@@ -322,6 +544,7 @@ public class BasicPaloAltoFirewallParserTest extends 
AbstractParserConfigTest {
   }
 
   public static final String THREAT_71 = "1,2017/05/25 
19:31:13,0006C000005,THREAT,url,0,2017/05/25 
19:31:13,192.168.1.7,140.177.26.29,201.1.4.5,140.177.26.29,ms_out,,,ssl,vsys1,mgmt,untrust,vlan.199,vlan.1,Standard-Syslog,2017/05/25
 
19:31:13,50556,1,56059,443,14810,443,0x40b000,tcp,alert,\"settings-win.data.microsoft.com/\",(9999),computer-and-internet-info,informational,client-to-server,10030265,0x0,192.168.0.0-192.168.255.255,IE,0,,0,,,0,,,,,,,,0,16,11,0,0,,PAN1,";
+
   @SuppressWarnings("unchecked")
   @Test
   public void testParseThreat71() throws ParseException {
@@ -381,6 +604,7 @@ public class BasicPaloAltoFirewallParserTest extends 
AbstractParserConfigTest {
   }
 
   public static final String THREAT_80 = "1,2018/02/01 
21:29:03,001606000007,THREAT,vulnerability,1,2018/02/01 
21:29:03,213.211.198.62,172.16.2.6,213.211.198.62,192.168.178.202,Outgoing,,,web-browsing,vsys1,internet,guest,ethernet1/1,ethernet1/2.2,test,2018/02/01
 
21:29:03,18720,1,80,53161,80,32812,0x402000,tcp,reset-server,\"www.eicar.org/download/eicar.com\",Eicar
 File 
Detected(39040),computer-and-internet-info,medium,server-to-client,27438839,0x0,Germany,172.16.0.0-172.31.255.255,0,,0,,,9,,,,,,,,0,0,0,0,0,,PAN1,,,,,0,,0,,N/A,code-execution,AppThreat-771-4450,0x0";
+
   @SuppressWarnings("unchecked")
   @Test
   public void testParseThreat80() throws ParseException {
@@ -445,6 +669,7 @@ public class BasicPaloAltoFirewallParserTest extends 
AbstractParserConfigTest {
   }
 
   public static final String TRAFFIC_80 = "1,2018/02/01 
21:24:11,001606000007,TRAFFIC,end,1,2018/02/01 
21:24:11,172.16.2.31,134.19.6.22,192.168.18.2,134.19.6.22,Outgoing,,,ssl,vsys1,guest,internet,ethernet1/2.2,ethernet1/1,test,2018/02/01
 
21:24:11,19468,1,41537,443,12211,443,0x40001c,tcp,allow,7936,1731,6205,24,2018/02/01
 
21:00:42,1395,computer-and-internet-info,0,62977478,0x0,172.16.0.0-172.31.255.255,United
 States,0,14,10,tcp-rst-from-client,0,0,0,0,,PAN1,from-policy,,,0,,0,,N/A";
+
   @SuppressWarnings("unchecked")
   @Test
   public void testParseTraffic80() throws ParseException {
@@ -507,4 +732,23 @@ public class BasicPaloAltoFirewallParserTest extends 
AbstractParserConfigTest {
     expected.put(BasicPaloAltoFirewallParser.DeviceName, "PAN1");
     assertEquals(expected, actual);
   }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParseInvalidLogTypeMessage() throws ParseException {
+    final String unsupportedLogTypeMessage = "1,2017/08/11 
12:37:58,008900008659,INVALIDlogType,0,1,2017/08/11 
11:37:58,192.168.14.162,vsys1,edit,admin,Web,Succeeded, config shared 
log-settings config,1354,0x0";
+    List<JSONObject> actual = 
parser.parse(unsupportedLogTypeMessage.getBytes());
+
+    assertNull(actual);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testParseInvalidVersionMessage() throws ParseException {
+    final String invalidLengthMessage = "1,2017/08/11 
12:37:58,008900008659,CONFIG,0,1,2017/08/11 
11:37:58,192.168.14.162,vsys1,edit,admin,Web,Succeeded, config shared 
log-settings config";
+
+    JSONObject actual = parser.parse(invalidLengthMessage.getBytes()).get(0);
+    String expectedParserVersion = 
actual.get(BasicPaloAltoFirewallParser.ParserVersion).toString();
+    assertEquals(expectedParserVersion, "0");
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/syslog/Syslog5424ParserTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/syslog/Syslog5424ParserTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/syslog/Syslog5424ParserTest.java
index 0ef26ff..b3e4507 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/syslog/Syslog5424ParserTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/syslog/Syslog5424ParserTest.java
@@ -20,6 +20,7 @@ package org.apache.metron.parsers.syslog;
 
 import com.github.palindromicity.syslog.NilPolicy;
 import com.github.palindromicity.syslog.dsl.SyslogFieldKeys;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
@@ -28,6 +29,7 @@ import java.time.format.DateTimeFormatter;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Consumer;
 
 public class Syslog5424ParserTest {
@@ -87,7 +89,7 @@ public class Syslog5424ParserTest {
     });
   }
 
-  @Test(expected = IllegalStateException.class)
+  @Test()
   public void testNotValid() {
     test(null, "not valid", (message) -> Assert.assertTrue(false));
   }
@@ -100,7 +102,7 @@ public class Syslog5424ParserTest {
     }
     parser.configure(config);
 
-    List<JSONObject> output = parser.parse(line.getBytes());
+    parser.parseOptionalResult(line.getBytes());
   }
 
   @Test
@@ -116,8 +118,33 @@ public class Syslog5424ParserTest {
             .append(SYSLOG_LINE_MISSING)
             .append("\n")
             .append(SYSLOG_LINE_ALL);
-    List<JSONObject> output = parser.parse(builder.toString().getBytes());
-    Assert.assertEquals(3,output.size());
+    Optional<MessageParserResult<JSONObject>> resultOptional = 
parser.parseOptionalResult(builder.toString().getBytes());
+    Assert.assertNotNull(resultOptional);
+    Assert.assertTrue(resultOptional.isPresent());
+    List<JSONObject> parsedList = resultOptional.get().getMessages();
+    Assert.assertEquals(3,parsedList.size());
+  }
+
+  @Test
+  public void testReadMultiLineWithErrors() throws Exception {
+    Syslog5424Parser parser = new Syslog5424Parser();
+    Map<String, Object> config = new HashMap<>();
+    config.put(Syslog5424Parser.NIL_POLICY_CONFIG, NilPolicy.DASH.name());
+    parser.configure(config);
+    StringBuilder builder = new StringBuilder();
+    builder
+            .append("HEREWEGO!!!!\n")
+            .append(SYSLOG_LINE_ALL)
+            .append("\n")
+            .append(SYSLOG_LINE_MISSING)
+            .append("\n")
+            .append("BOOM!\n")
+            .append(SYSLOG_LINE_ALL)
+            .append("\nOHMY!");
+    Optional<MessageParserResult<JSONObject>> output = 
parser.parseOptionalResult(builder.toString().getBytes());
+    Assert.assertTrue(output.isPresent());
+    Assert.assertEquals(3,output.get().getMessages().size());
+    Assert.assertEquals(3,output.get().getMessageThrowables().size());
   }
 
   @Test
@@ -126,21 +153,29 @@ public class Syslog5424ParserTest {
     Map<String, Object> config = new HashMap<>();
     config.put(Syslog5424Parser.NIL_POLICY_CONFIG, NilPolicy.DASH.name());
     parser.configure(config);
-    List<JSONObject> output = 
parser.parse(SYSLOG_LINE_MISSING_DATE.getBytes());
-    String timeStampString = output.get(0).get("timestamp").toString();
+    Optional<MessageParserResult<JSONObject>> output  = 
parser.parseOptionalResult(SYSLOG_LINE_MISSING_DATE.getBytes());
+    Assert.assertNotNull(output);
+    Assert.assertTrue(output.isPresent());
+    String timeStampString = 
output.get().getMessages().get(0).get("timestamp").toString();
     DateTimeFormatter.ISO_DATE_TIME.parse(timeStampString);
     config.clear();
     config.put(Syslog5424Parser.NIL_POLICY_CONFIG, NilPolicy.NULL.name());
     parser.configure(config);
-    output = parser.parse(SYSLOG_LINE_MISSING_DATE.getBytes());
-    timeStampString = output.get(0).get("timestamp").toString();
+    output = parser.parseOptionalResult(SYSLOG_LINE_MISSING_DATE.getBytes());
+    Assert.assertNotNull(output);
+    Assert.assertTrue(output.isPresent());
+    timeStampString = 
output.get().getMessages().get(0).get("timestamp").toString();
     DateTimeFormatter.ISO_DATE_TIME.parse(timeStampString);
 
     config.clear();
     config.put(Syslog5424Parser.NIL_POLICY_CONFIG, NilPolicy.OMIT.name());
     parser.configure(config);
-    output = parser.parse(SYSLOG_LINE_MISSING_DATE.getBytes());
-    timeStampString = output.get(0).get("timestamp").toString();
+
+    output = parser.parseOptionalResult(SYSLOG_LINE_MISSING_DATE.getBytes());
+    Assert.assertNotNull(output);
+    Assert.assertTrue(output.isPresent());
+
+    timeStampString = 
output.get().getMessages().get(0).get("timestamp").toString();
     DateTimeFormatter.ISO_DATE_TIME.parse(timeStampString);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
index 230c147..eb447d0 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
@@ -24,11 +24,14 @@ import java.time.*;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.log4j.Level;
 import org.apache.metron.parsers.GrokParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.json.simple.JSONObject;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -54,7 +57,10 @@ public class GrokWebSphereParserTest {
                parser.configure(parserConfig);
                String testString = "<133>Apr 15 17:47:28 ABCXML1413 
[rojOut][0x81000033][auth][notice] user(rick007): "
                                + "[120.43.200.6]: User logged into 'cohlOut'.";
-               List<JSONObject> result = parser.parse(testString.getBytes());
+               Optional<MessageParserResult<JSONObject>> resultOptional = 
parser.parseOptionalResult(testString.getBytes());
+               Assert.assertNotNull(resultOptional);
+               Assert.assertTrue(resultOptional.isPresent());
+               List<JSONObject> result = resultOptional.get().getMessages();
                JSONObject parsedJSON = result.get(0);
 
                long expectedTimestamp = 
ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 47, 28, 0, 
UTC).toInstant().toEpochMilli();
@@ -73,14 +79,17 @@ public class GrokWebSphereParserTest {
        }
        
        @Test
-       public void tetsParseLogoutLine() throws Exception {
+       public void testParseLogoutLine() throws Exception {
                
                //Set up parser, parse message
                GrokWebSphereParser parser = new GrokWebSphereParser();
                parser.configure(parserConfig);
                String testString = "<134>Apr 15 18:02:27 PHIXML3RWD 
[0x81000019][auth][info] [14.122.2.201]: "
                                + "User 'hjpotter' logged out from 'default'.";
-               List<JSONObject> result = parser.parse(testString.getBytes());
+               Optional<MessageParserResult<JSONObject>> resultOptional = 
parser.parseOptionalResult(testString.getBytes());
+               Assert.assertNotNull(resultOptional);
+               Assert.assertTrue(resultOptional.isPresent());
+               List<JSONObject> result = resultOptional.get().getMessages();
                JSONObject parsedJSON = result.get(0);
 
                long expectedTimestamp = 
ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 18, 2, 27, 0, 
UTC).toInstant().toEpochMilli();
@@ -98,14 +107,17 @@ public class GrokWebSphereParserTest {
        }
        
        @Test
-       public void tetsParseRBMLine() throws Exception {
+       public void testParseRBMLine() throws Exception {
                
                //Set up parser, parse message
                GrokWebSphereParser parser = new GrokWebSphereParser();
                parser.configure(parserConfig);
                String testString = "<131>Apr 15 17:36:35 ROBXML3QRS 
[0x80800018][auth][error] rbm(RBM-Settings): "
                                + "trans(3502888135)[request] gtid(3502888135): 
RBM: Resource access denied.";
-               List<JSONObject> result = parser.parse(testString.getBytes());
+               Optional<MessageParserResult<JSONObject>> resultOptional = 
parser.parseOptionalResult(testString.getBytes());
+               Assert.assertNotNull(resultOptional);
+               Assert.assertTrue(resultOptional.isPresent());
+               List<JSONObject> result = resultOptional.get().getMessages();
                JSONObject parsedJSON = result.get(0);
 
                long expectedTimestamp = 
ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 36, 35, 0, 
UTC).toInstant().toEpochMilli();
@@ -122,16 +134,18 @@ public class GrokWebSphereParserTest {
        }
        
        @Test
-       public void tetsParseOtherLine() throws Exception {
+       public void testParseOtherLine() throws Exception {
                
                //Set up parser, parse message
                GrokWebSphereParser parser = new GrokWebSphereParser();
                parser.configure(parserConfig);
                String testString = "<134>Apr 15 17:17:34 SAGPXMLQA333 
[0x8240001c][audit][info] trans(191): (admin:default:system:*): "
                                + "ntp-service 'NTP Service' - Operational 
state down";
-               List<JSONObject> result = parser.parse(testString.getBytes());
+               Optional<MessageParserResult<JSONObject>> resultOptional = 
parser.parseOptionalResult(testString.getBytes());
+               Assert.assertNotNull(resultOptional);
+               Assert.assertTrue(resultOptional.isPresent());
+               List<JSONObject> result = resultOptional.get().getMessages();
                JSONObject parsedJSON = result.get(0);
-
                long expectedTimestamp = 
ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 17, 34, 0, 
UTC).toInstant().toEpochMilli();
                
                //Compare fields
@@ -153,7 +167,10 @@ public class GrokWebSphereParserTest {
                parser.configure(parserConfig);
                String testString = "<133>Apr 15 17:47:28 ABCXML1413 
[rojOut][0x81000033][auth][notice] rick007): "
                                + "[120.43.200. User logged into 'cohlOut'.";
-               List<JSONObject> result = parser.parse(testString.getBytes());
+               Optional<MessageParserResult<JSONObject>> resultOptional = 
parser.parseOptionalResult(testString.getBytes());
+               Assert.assertNotNull(resultOptional);
+               Assert.assertTrue(resultOptional.isPresent());
+               List<JSONObject> result = resultOptional.get().getMessages();
                JSONObject parsedJSON = result.get(0);
 
                long expectedTimestamp = 
ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 47, 28, 0, 
UTC).toInstant().toEpochMilli();
@@ -172,14 +189,17 @@ public class GrokWebSphereParserTest {
        }
        
        @Test
-       public void tetsParseMalformedLogoutLine() throws Exception {
+       public void testParseMalformedLogoutLine() throws Exception {
                
                //Set up parser, attempt to parse malformed message
                GrokWebSphereParser parser = new GrokWebSphereParser();
                parser.configure(parserConfig);
                String testString = "<134>Apr 15 18:02:27 PHIXML3RWD 
[0x81000019][auth][info] [14.122.2.201: "
                                + "User 'hjpotter' logged out from 'default.";
-               List<JSONObject> result = parser.parse(testString.getBytes());
+               Optional<MessageParserResult<JSONObject>> resultOptional = 
parser.parseOptionalResult(testString.getBytes());
+               Assert.assertNotNull(resultOptional);
+               Assert.assertTrue(resultOptional.isPresent());
+               List<JSONObject> result = resultOptional.get().getMessages();
                JSONObject parsedJSON = result.get(0);
 
                long expectedTimestamp = 
ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 18, 2, 27, 0, 
UTC).toInstant().toEpochMilli();
@@ -197,14 +217,17 @@ public class GrokWebSphereParserTest {
        }
        
        @Test
-       public void tetsParseMalformedRBMLine() throws Exception {
+       public void testParseMalformedRBMLine() throws Exception {
                
                //Set up parser, parse message
                GrokWebSphereParser parser = new GrokWebSphereParser();
                parser.configure(parserConfig);
                String testString = "<131>Apr 15 17:36:35 ROBXML3QRS 
[0x80800018][auth][error] rbmRBM-Settings): "
                                + "trans3502888135)[request] gtid3502888135) 
RBM: Resource access denied.";
-               List<JSONObject> result = parser.parse(testString.getBytes());
+               Optional<MessageParserResult<JSONObject>> resultOptional = 
parser.parseOptionalResult(testString.getBytes());
+               Assert.assertNotNull(resultOptional);
+               Assert.assertTrue(resultOptional.isPresent());
+               List<JSONObject> result = resultOptional.get().getMessages();
                JSONObject parsedJSON = result.get(0);
 
                long expectedTimestamp = 
ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 36, 35, 0, 
UTC).toInstant().toEpochMilli();
@@ -221,14 +244,17 @@ public class GrokWebSphereParserTest {
        }
        
        @Test
-       public void tetsParseMalformedOtherLine() throws Exception {
+       public void testParseMalformedOtherLine() throws Exception {
                
                //Set up parser, parse message
                GrokWebSphereParser parser = new GrokWebSphereParser();
                parser.configure(parserConfig);
                String testString = "<134>Apr 15 17:17:34 SAGPXMLQA333 
[0x8240001c][audit][info] trans 191)  admindefaultsystem*): "
                                + "ntp-service 'NTP Service' - Operational 
state down:";
-               List<JSONObject> result = parser.parse(testString.getBytes());
+               Optional<MessageParserResult<JSONObject>> resultOptional = 
parser.parseOptionalResult(testString.getBytes());
+               Assert.assertNotNull(resultOptional);
+               Assert.assertTrue(resultOptional.isPresent());
+               List<JSONObject> result = resultOptional.get().getMessages();
                JSONObject parsedJSON = result.get(0);
 
                long expectedTimestamp = 
ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 17, 34, 0, 
UTC).toInstant().toEpochMilli();
@@ -244,17 +270,4 @@ public class GrokWebSphereParserTest {
                assertEquals("trans 191)  admindefaultsystem*): ntp-service 
'NTP Service' - Operational state down:", parsedJSON.get("message"));
        }
        
-       
-       @Test(expected=RuntimeException.class)
-       public void testParseEmptyLine() throws Exception {
-               
-               //Set up parser, attempt to parse malformed message
-               GrokWebSphereParser parser = new GrokWebSphereParser();
-               parser.configure(parserConfig);
-               String testString = "";
-               UnitTestHelper.setLog4jLevel(GrokParser.class, Level.FATAL);
-               List<JSONObject> result = parser.parse(testString.getBytes());
-               UnitTestHelper.setLog4jLevel(GrokParser.class, Level.ERROR);
-       }
-               
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_log.txt
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_log.txt 
b/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_log.txt
new file mode 100644
index 0000000..95d3fec
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_log.txt
@@ -0,0 +1,10 @@
+2018-08-30T21:02:32.047266Z vault-development 192.168.113.53:60002 
192.168.82.130:8200 0.000695 0.000017 0.000015 - - 607 3078 "- - - " "-" - -
+2018-08-30T21:02:22.119595Z vault-development 192.168.113.29:57322 
192.168.72.75:8200 0.00051 0.00001 0.000013 - - 607 3079 "- - - " "-" - -
+2018-08-30T21:05:58.275961Z vault-production 192.168.205.159:58390 
192.168.68.196:8200 0.000767 0.000009 0.000012 - - 673 3210 "- - - " "-" - -
+2018-08-30T21:05:59.222277Z vault-production 192.168.228.182:26358 
192.168.81.224:8200 0.000882 0.000014 0.000024 - - 519 3920 "- - - " "-" - -
+2018-08-30T21:05:59.234471Z vault-production 192.168.228.182:35506 
192.168.79.6:8200 0.000377 0.000011 0.000009 - - 519 3919 "- - - " "-" - -
+2018-08-30T21:05:59.237375Z vault-production 192.168.228.182:52516 
192.168.68.196:8200 0.000628 0.000007 0.00001 - - 519 3918 "- - - " "-" - -
+2018-08-30T21:06:05.235460Z vault-production 192.168.228.182:41783 
192.168.79.6:8200 0.000309 0.000006 0.00001 - - 519 3918 "- - - " "-" - -
+2018-08-30T21:06:05.226698Z vault-production 192.168.228.182:40008 
192.168.81.224:8200 0.000955 0.000014 0.000013 - - 519 3919 "- - - " "-" - -
+2018-08-30T21:06:05.237946Z vault-production 192.168.228.182:19261 
192.168.68.196:8200 0.000661 0.000006 0.000009 - - 519 3918 "- - - " "-" - -
+2018-08-30T21:06:11.229542Z vault-production 192.168.228.182:44082 
192.168.81.224:8200 0.000912 0.000009 0.000014 - - 519 3919 "- - - " "-" - -

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_with_errors_log.txt
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_with_errors_log.txt
 
b/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_with_errors_log.txt
new file mode 100644
index 0000000..3525fc4
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_with_errors_log.txt
@@ -0,0 +1,13 @@
+2018-08-30T21:02:32.047266Z vault-development 192.168.113.53:60002 
192.168.82.130:8200 0.000695 0.000017 0.000015 - - 607 3078 "- - - " "-" - -
+2018-08-30T21:02:22.119595Z vault-development 192.168.113.29:57322 
192.168.72.75:8200 0.00051 0.00001 0.000013 - - 607 3079 "- - - " "-" - -
+2018-08-30T21:05:58.275961Z vault-production 192.168.205.159:58390 
192.168.68.196:8200 0.000767 0.000009 0.000012 - - 673 3210 "- - - " "-" - -
+2018-08-30T21:05:59.222277Z vault-production 192.168.228.182:26358 
192.168.81.224:8200 0.000882 0.000014 0.000024 - - 519 3920 "- - - " "-" - -
+BOOM
+BLAM
+BOP
+2018-08-30T21:05:59.234471Z vault-production 192.168.228.182:35506 
192.168.79.6:8200 0.000377 0.000011 0.000009 - - 519 3919 "- - - " "-" - -
+2018-08-30T21:05:59.237375Z vault-production 192.168.228.182:52516 
192.168.68.196:8200 0.000628 0.000007 0.00001 - - 519 3918 "- - - " "-" - -
+2018-08-30T21:06:05.235460Z vault-production 192.168.228.182:41783 
192.168.79.6:8200 0.000309 0.000006 0.00001 - - 519 3918 "- - - " "-" - -
+2018-08-30T21:06:05.226698Z vault-production 192.168.228.182:40008 
192.168.81.224:8200 0.000955 0.000014 0.000013 - - 519 3919 "- - - " "-" - -
+2018-08-30T21:06:05.237946Z vault-production 192.168.228.182:19261 
192.168.68.196:8200 0.000661 0.000006 0.000009 - - 519 3918 "- - - " "-" - -
+2018-08-30T21:06:11.229542Z vault-production 192.168.228.182:44082 
192.168.81.224:8200 0.000912 0.000009 0.000014 - - 519 3919 "- - - " "-" - -

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-solr/src/main/config/schema/bro/schema.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/config/schema/bro/schema.xml 
b/metron-platform/metron-solr/src/main/config/schema/bro/schema.xml
index ea9f6d3..6be76a0 100644
--- a/metron-platform/metron-solr/src/main/config/schema/bro/schema.xml
+++ b/metron-platform/metron-solr/src/main/config/schema/bro/schema.xml
@@ -34,6 +34,7 @@
          * Metron-specific fields
   -->
   <field name="source.type" type="string" indexed="true" stored="true" />
+  <field name="alert_status" type="string" indexed="true" stored="true" />
   <field name="timestamp" type="timestamp" indexed="true" stored="true" />
   <field name="guid" type="string" indexed="true" stored="true" 
required="true" multiValued="false" />
   <uniqueKey>guid</uniqueKey>

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-solr/src/main/config/schema/snort/schema.xml
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/main/config/schema/snort/schema.xml 
b/metron-platform/metron-solr/src/main/config/schema/snort/schema.xml
index 84855df..3c57574 100644
--- a/metron-platform/metron-solr/src/main/config/schema/snort/schema.xml
+++ b/metron-platform/metron-solr/src/main/config/schema/snort/schema.xml
@@ -22,6 +22,7 @@
   <!-- Metron specific fields -->
   <field name="timestamp" type="timestamp" indexed="true" stored="true" />
   <field name="source.type" type="string" indexed="true" stored="true" />
+  <field name="alert_status" type="string" indexed="true" stored="true" />
   <field name="guid" type="string" indexed="true" stored="true" 
required="true" multiValued="false" />
   <uniqueKey>guid</uniqueKey>
 

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-solr/src/main/config/schema/yaf/schema.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/config/schema/yaf/schema.xml 
b/metron-platform/metron-solr/src/main/config/schema/yaf/schema.xml
index 5555a14..37e5f12 100644
--- a/metron-platform/metron-solr/src/main/config/schema/yaf/schema.xml
+++ b/metron-platform/metron-solr/src/main/config/schema/yaf/schema.xml
@@ -22,6 +22,7 @@
   <!-- Metron specific fields -->
   <field name="timestamp" type="timestamp" indexed="true" stored="true" />
   <field name="source.type" type="string" indexed="true" stored="true" />
+  <field name="alert_status" type="string" indexed="true" stored="true" />
   <field name="guid" type="string" indexed="true" stored="true" 
required="true" multiValued="false" />
   <uniqueKey>guid</uniqueKey>
 

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java
 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java
index 4390fd1..84f2222 100644
--- 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java
+++ 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java
@@ -133,7 +133,7 @@ public class SolrSearchIntegrationTest extends 
SearchIntegrationTest {
       Assert.assertEquals(FieldType.OTHER, fieldTypes.get("timestamp"));
 
       // Bro only field in the dynamic catch all
-      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("bro_field"));
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("method"));
 
       // A field is in both bro and snort and they have different types.
       Assert.assertEquals(FieldType.TEXT, fieldTypes.get("ttl"));
@@ -169,7 +169,7 @@ public class SolrSearchIntegrationTest extends 
SearchIntegrationTest {
       Assert.assertEquals(FieldType.OTHER, fieldTypes.get("timestamp"));
 
       // Snort only field in the dynamic catch all
-      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("snort_field"));
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("sig_generator"));
 
       // A field is in both bro and snort and they have different types.
       Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ttl"));
@@ -211,10 +211,10 @@ public class SolrSearchIntegrationTest extends 
SearchIntegrationTest {
     Assert.assertEquals(FieldType.OTHER, fieldTypes.get("timestamp"));
 
     // Bro only field in the dynamic catch all
-    Assert.assertEquals(FieldType.OTHER, fieldTypes.get("bro_field"));
+    Assert.assertEquals(FieldType.TEXT, fieldTypes.get("method"));
 
     // Snort only field in the dynamic catch all
-    Assert.assertEquals(FieldType.OTHER, fieldTypes.get("snort_field"));
+    Assert.assertEquals(FieldType.TEXT, fieldTypes.get("sig_generator"));
 
     // A field is in both bro and snort and they have different types.
     Assert.assertEquals(FieldType.OTHER, fieldTypes.get("ttl"));

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-writer/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/pom.xml 
b/metron-platform/metron-writer/pom.xml
index e845516..6d08093 100644
--- a/metron-platform/metron-writer/pom.xml
+++ b/metron-platform/metron-writer/pom.xml
@@ -207,6 +207,12 @@
             <artifactId>metron-common</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-test-utilities</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index 7678584..68585c5 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -40,6 +40,7 @@ import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -118,12 +119,18 @@ public class BulkWriterComponent<MESSAGE_T> {
 
   public void error(String sensorType, Throwable e, Iterable<Tuple> tuples, 
MessageGetStrategy messageGetStrategy) {
     LOG.error(format("Failing %d tuple(s); sensorType=%s", 
Iterables.size(tuples), sensorType), e);
-    MetronError error = new MetronError()
-            .withSensorType(Collections.singleton(sensorType))
-            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
-            .withThrowable(e);
-    tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
-    handleError(tuples, error);
+    tuples.forEach(t -> {
+      MetronError error = new MetronError()
+              .withSensorType(Collections.singleton(sensorType))
+              .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+              .withThrowable(e)
+              .addRawMessage(messageGetStrategy.get(t));
+      collector.emit(Constants.ERROR_STREAM, new 
Values(error.getJSONObject()));
+      collector.ack(t);
+    });
+    // there is only one error to report for all of the failed tuples
+    collector.reportError(e);
+
   }
 
   /**
@@ -133,24 +140,24 @@ public class BulkWriterComponent<MESSAGE_T> {
    * <p>Without a valid message, the JSON message cannot be added to the error.
    *
    * @param e The exception that occurred.
-   * @param tuples The tuples to error that may not contain valid messages.
+   * @param tuple The tuple to error that may not contain a valid message.
    */
-  public void error(Throwable e, Iterable<Tuple> tuples) {
-    LOG.error(format("Failing %d tuple(s)", Iterables.size(tuples)), e);
+  public void error(Throwable e, Tuple tuple) {
+    LOG.error("Failing tuple", e);
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.INDEXING_ERROR)
             .withThrowable(e);
-    handleError(tuples, error);
+    handleError(tuple, error);
   }
 
   /**
    * Errors a set of tuples.
    *
-   * @param tuples The tuples to error.
+   * @param tuple The tuple to error.
    * @param error
    */
-  private void handleError(Iterable<Tuple> tuples, MetronError error) {
-    tuples.forEach(t -> collector.ack(t));
+  private void handleError(Tuple tuple, MetronError error) {
+    collector.ack(tuple);
     ErrorUtils.handleError(collector, error);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index 4bb3888..590ab8c 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -311,7 +311,7 @@ public class BulkMessageWriterBolt<CONFIG_T extends 
Configurations> extends Conf
     LOG.debug("Unable to extract message from tuple; expected valid JSON");
     getWriterComponent().error(
             new Exception("Unable to extract message from tuple; expected 
valid JSON"),
-            ImmutableList.of(tuple)
+            tuple
     );
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
index c389854..decf3a5 100644
--- 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
+++ 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
@@ -18,10 +18,12 @@
 package org.apache.metron.writer;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.powermock.api.mockito.PowerMockito.verifyStatic;
@@ -36,8 +38,10 @@ import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.test.error.MetronErrorJSONMatcher;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 import org.json.simple.JSONObject;
 import org.junit.Before;
 import org.junit.Rule;
@@ -127,9 +131,12 @@ public class BulkWriterComponentTest {
   @Test
   public void writeShouldProperlyHandleWriterErrors() throws Exception {
     Throwable e = new Exception("test exception");
-    MetronError error = new MetronError()
+    MetronError expectedError1 = new MetronError()
             .withSensorType(Collections.singleton(sensorType))
-            
.withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1,
 message2));
+            
.withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1));
+    MetronError expectedError2 = new MetronError()
+            .withSensorType(Collections.singleton(sensorType))
+            
.withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2));
     BulkWriterResponse response = new BulkWriterResponse();
     response.addAllErrors(e, tupleList);
 
@@ -139,8 +146,14 @@ public class BulkWriterComponentTest {
     bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, 
configurations, messageGetStrategy);
     bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, 
configurations, messageGetStrategy);
 
-    verifyStatic(times(1));
-    ErrorUtils.handleError(collector, error);
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            new Values(argThat(new 
MetronErrorJSONMatcher(expectedError1.getJSONObject()))));
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            new Values(argThat(new 
MetronErrorJSONMatcher(expectedError2.getJSONObject()))));
+    verify(collector, times(1)).ack(tuple1);
+    verify(collector, times(1)).ack(tuple2);
+    verify(collector, times(1)).reportError(e);
+    verifyNoMoreInteractions(collector);
   }
 
   @Test
@@ -161,9 +174,16 @@ public class BulkWriterComponentTest {
   @Test
   public void writeShouldProperlyHandleWriterException() throws Exception {
     Throwable e = new Exception("test exception");
-    MetronError error = new MetronError()
+    MetronError expectedError1 = new MetronError()
+            .withSensorType(Collections.singleton(sensorType))
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+            .withThrowable(e)
+            .withRawMessages(Collections.singletonList(message1));
+    MetronError expectedError2 = new MetronError()
             .withSensorType(Collections.singleton(sensorType))
-            
.withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1,
 message2));
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+            .withThrowable(e)
+            .withRawMessages(Collections.singletonList(message2));
     BulkWriterResponse response = new BulkWriterResponse();
     response.addAllErrors(e, tupleList);
 
@@ -173,8 +193,14 @@ public class BulkWriterComponentTest {
     bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, 
configurations, messageGetStrategy);
     bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, 
configurations, messageGetStrategy);
 
-    verifyStatic(times(1));
-    ErrorUtils.handleError(collector, error);
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            new Values(argThat(new 
MetronErrorJSONMatcher(expectedError1.getJSONObject()))));
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            new Values(argThat(new 
MetronErrorJSONMatcher(expectedError2.getJSONObject()))));
+    verify(collector, times(1)).ack(tuple1);
+    verify(collector, times(1)).ack(tuple2);
+    verify(collector, times(1)).reportError(e);
+    verifyNoMoreInteractions(collector);
   }
 
   @Test
@@ -182,19 +208,28 @@ public class BulkWriterComponentTest {
     Throwable e = new Exception("test exception");
     MetronError error1 = new MetronError()
             .withSensorType(Collections.singleton("sensor1"))
-            
.withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1));
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+            .withThrowable(e)
+            .withRawMessages(Collections.singletonList(message1));
     MetronError error2 = new MetronError()
             .withSensorType(Collections.singleton("sensor2"))
-            
.withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2));
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+            .withThrowable(e)
+            .withRawMessages(Collections.singletonList(message2));
 
     BulkWriterComponent<JSONObject> bulkWriterComponent = new 
BulkWriterComponent<>(collector);
     bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, 
configurations, messageGetStrategy);
     bulkWriterComponent.write("sensor2", tuple2, message2, bulkMessageWriter, 
configurations, messageGetStrategy);
     bulkWriterComponent.errorAll(e, messageGetStrategy);
 
-    verifyStatic(times(1));
-    ErrorUtils.handleError(collector, error1);
-    ErrorUtils.handleError(collector, error2);
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            new Values(argThat(new 
MetronErrorJSONMatcher(error1.getJSONObject()))));
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            new Values(argThat(new 
MetronErrorJSONMatcher(error2.getJSONObject()))));
+    verify(collector, times(1)).ack(tuple1);
+    verify(collector, times(1)).ack(tuple2);
+    verify(collector, times(2)).reportError(e);
+    verifyNoMoreInteractions(collector);
 
     bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, 
configurations, messageGetStrategy);
     verify(bulkMessageWriter, times(0)).write(sensorType, configurations, 
Collections.singletonList(tuple1), Collections.singletonList(message1));

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-stellar/stellar-common/README.md
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/README.md 
b/metron-stellar/stellar-common/README.md
index 468f358..5b0ec17 100644
--- a/metron-stellar/stellar-common/README.md
+++ b/metron-stellar/stellar-common/README.md
@@ -31,6 +31,12 @@ For a variety of components (threat intelligence triage and 
field transformation
     * [Advanced Usage](#advanced-usage)
     * [Implementation](#implementation)
 * [Stellar Configuration](#stellar-configuration)
+* [Stellar REST Client](#stellar-rest-client)
+    * [Configuration](#configuration)
+    * [Security](#security)
+    * [Examples](#examples)
+    * [Latency](#latency)
+    * [Response Handling](#response-handling)
 
 
 ## Introduction
@@ -169,6 +175,7 @@ Where:
 | [ `CHOP`](#chop)                                                             
                      |
 | [ `CHOMP`](#chomp)                                                           
                      |
 | [ `COUNT_MATCHES`](#count_matches)                                           
                      |
+| [ `DATE_FORMAT`](#date_format)
 | [ `DAY_OF_MONTH`](#day_of_month)                                             
                      |
 | [ `DAY_OF_WEEK`](#day_of_week)                                               
                      |
 | [ `DAY_OF_YEAR`](#day_of_year)                                               
                      |
@@ -247,7 +254,8 @@ Where:
 | [ `REDUCE`](#reduce)                                                         
                      |
 | [ `REGEXP_MATCH`](#regexp_match)                                             
                      |
 | [ `REGEXP_GROUP_VAL`](#regexp_group_val)                                     
                      |
-| [ `REGEXP_REPLACE`](#regexp_replace)                                         
                      |
+| [ `REGEXP_REPLACE`](#regexp_replace)
+| [ `REST_GET`](#rest_get)
 | [ `ROUND`](#round)                                                           
                      |
 | [ `SAMPLE_ADD`](../../metron-analytics/metron-statistics#sample_add)         
                      |
 | [ `SAMPLE_GET`](../../metron-analytics/metron-statistics#sample_get)         
                      |
@@ -379,6 +387,14 @@ Where:
     * substring/character - the substring or character to count, may be null.
   * Returns: the number of non-overlapping occurrences, 0 if either 
CharSequence is null.
 
+### `DATE_FORMAT`
+  * Description: Takes an epoch timestamp and converts it to a date format.
+  * Input:
+    * format - DateTime format as a String.
+    * timestampField - Optional epoch time in Long format.  Defaults to now.
+    * timezone - Optional timezone in String format.
+  * Returns: Formatted date.
+  
 ### `DAY_OF_MONTH`
   * Description: The numbered day within the month.  The first day within the 
month has a value of 1.
   * Input:
@@ -907,6 +923,13 @@ Where:
     * pattern - The proposed regex pattern
     * value - The value to replace the regex pattern
   * Returns: The modified input string with replaced values
+  
+### `REST_GET`
+  * Description: Performs a REST GET request and parses the JSON results into 
a map.
+  * Input:
+    * url - URI to the REST service
+    * rest_config - Optional - Map (in curly braces) of name:value pairs, each 
overriding the global config parameter of the same name. Default is the empty 
Map, meaning no overrides.
+  * Returns: JSON results as a Map
 
 ### `ROUND`
   * Description: Rounds a number to the nearest integer.  This is half-up 
rounding.
@@ -1590,3 +1613,85 @@ that specify what should be included when searching for 
Stellar functions.
 }
 ```
 
+## Stellar REST Client
+
+Stellar provides a REST Client with the `REST_GET` function.  This function 
depends on the Apache HttComponents library for
+executing Http requests.  The syntax is:
+```
+REST_GET( uri , optional config )
+```
+
+### Configuration
+
+The second argument is an optional Map of settings.  The following settings 
are available:
+
+* basic.auth.user - User name for basic authentication.
+* basic.auth.password.path - Path to the basic authentication password file 
stored in HDFS.
+* proxy.host - Proxy host.
+* proxy.port - Proxy port.
+* proxy.basic.auth.user - User name for proxy basic authentication.
+* proxy.basic.auth.password.path - Path to the proxy basic authentication 
password file stored in HDFS.
+* timeout - Stellar enforced hard timeout for the total request time. Defaults 
to 1000 ms.  HttpClient timeouts alone are insufficient to guarantee the hard 
timeout.
+* connect.timeout - Connect timeout exposed by the HttpClient object.
+* connection.request.timeout - Connection request timeout exposed by the 
HttpClient object.
+* socket.timeout - Socket timeout exposed by the HttpClient object.
+* response.codes.allowed - A list of response codes that are allowed.  All 
others will be treated as errors.  Defaults to `200`.
+* empty.content.override - The default value that will be returned on a 
successful request with empty content.  Defaults to null.
+* error.value.override - The default value that will be returned on an error.  
Defaults to null.
+* pooling.max.total - The maximum number of connections in the connection pool.
+* pooling.default.max.per.route - The default maximum number of connections 
per route in the connection pool.
+
+This Map of settings can also be stored in the global config 
`stellar.rest.settings` property.  For example, to configure basic 
authentication
+settings you would add this property to the global config:
+
+```
+{
+  "stellar.rest.settings": {
+    "basic.auth.user": "user",
+    "basic.auth.password.path": "/password/path"
+  }
+}
+```
+
+Any settings passed into the expression will take precedence over the global 
config settings.  The global config settings will take precedence over the 
defaults.
+
+For security purposes, passwords are read from a file in HDFS.  Passwords are 
read as is including any new lines or spaces. Be careful not to include these 
in the file unless they are specifically part of the password.
+
+### Security
+
+At this time, only basic authentication is supported.  
+
+### Examples
+
+Perform a simple GET request with no authentication:
+```
+[Stellar]>>> REST_GET('http://httpbin.org/get')
+{args={}, headers={Accept=application/json, Accept-Encoding=gzip,deflate, 
Cache-Control=max-age=259200, Connection=close, Host=httpbin.org, 
User-Agent=Apache-HttpClient/4.3.2 (java 1.5)}, origin=127.0.0.1, 
136.62.241.236, url=http://httpbin.org/get}
+```
+
+Perform a GET request using basic authentication:
+```
+[Stellar]>>> config := {'basic.auth.user': 'user', 'basic.auth.password.path': 
'/password/path'}
+{basic.auth.user=user, basic.auth.password.path=/password/path}
+[Stellar]>>> REST_GET('http://httpbin.org/basic-auth/user/passwd', config)
+{authenticated=true, user=user}
+```
+
+Perform a GET request using a proxy:
+```
+[Stellar]>>> config := {'proxy.host': 'node1', 'proxy.port': 3128, 
'proxy.basic.auth.user': 'user', 'proxy.basic.auth.password.path': 
'/proxy/password/path'}
+{proxy.basic.auth.password.path=/proxy/password/path, proxy.port=3128, 
proxy.host=node1, proxy.basic.auth.user=user}
+[Stellar]>>> REST_GET('http://httpbin.org/get', config)
+{args={}, headers={Accept=application/json, Accept-Encoding=gzip,deflate, 
Cache-Control=max-age=259200, Connection=close, Host=httpbin.org, 
User-Agent=Apache-HttpClient/4.3.2 (java 1.5)}, origin=127.0.0.1, 
136.62.241.236, url=http://httpbin.org/get}
+```
+
+### Latency
+
+Performing a REST request will introduce latency in a streaming pipeline.  
Therefore this function should only be used for low volume telemetries that are 
unlikely to be
+affected by higher latency operations.  The `timeout` setting can be used to 
guarantee that requests complete within the configured time.
+
+### Response Handling
+
+In cases of Http errors, timeouts, etc this function will log the error and 
return null.  Only a status code of `200` is considered successful
+by default but this can be changed with the `response.codes.allowed` setting.  
Values returned on errors or emtpy content can be changed from 
+the default value of null using the `error.value.override` and 
`empty.content.override` respectively.

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-stellar/stellar-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/pom.xml 
b/metron-stellar/stellar-common/pom.xml
index 42e649d..b8be521 100644
--- a/metron-stellar/stellar-common/pom.xml
+++ b/metron-stellar/stellar-common/pom.xml
@@ -35,6 +35,17 @@
           <version>${global_caffeine_version}</version>
         </dependency>
         <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <version>${global_hadoop_version}</version>
+          <exclusions>
+            <exclusion>
+              <groupId>asm</groupId>
+              <artifactId>asm</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-auth</artifactId>
             <version>${global_hadoop_version}</version>
@@ -213,12 +224,41 @@
             <version>1.0.2</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${global_httpclient_version}</version>
+        </dependency>
+        <dependency>
             <!-- junit dependency added with default scope to allow inclusion 
of StellarProcessorUtils in main jar.
                  It is excluded from the uber-jar. -->
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>${global_junit_version}</version>
         </dependency>
+        <dependency>
+          <groupId>org.mock-server</groupId>
+          <artifactId>mockserver-netty</artifactId>
+          <version>3.10.8</version>
+          <scope>test</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>ch.qos.logback</groupId>
+              <artifactId>logback-classic</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
+          <groupId>org.mock-server</groupId>
+          <artifactId>mockserver-client-java</artifactId>
+          <version>3.10.8</version>
+          <scope>test</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>ch.qos.logback</groupId>
+              <artifactId>logback-classic</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/cli/StellarShell.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/cli/StellarShell.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/cli/StellarShell.java
index c81df61..083bad2 100644
--- 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/cli/StellarShell.java
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/cli/StellarShell.java
@@ -36,6 +36,7 @@ import 
org.apache.metron.stellar.common.shell.StellarAutoCompleter;
 import org.apache.metron.stellar.common.shell.StellarResult;
 import org.apache.metron.stellar.common.shell.StellarShellExecutor;
 import org.apache.metron.stellar.common.utils.JSONUtils;
+import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.jboss.aesh.complete.CompleteOperation;
 import org.jboss.aesh.complete.Completion;
 import org.jboss.aesh.console.AeshConsoleCallback;
@@ -337,6 +338,7 @@ public class StellarShell extends AeshConsoleCallback 
implements Completion {
   private void handleQuit() {
     try {
       console.stop();
+      StellarFunctions.close();
     } catch (Throwable e) {
       e.printStackTrace();
     }

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
index 0f04151..d31580b 100644
--- 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
@@ -67,6 +67,11 @@ public class Context implements Serializable {
      * is in the cache, then the cached result will be returned instead of 
recomputing.
      */
     CACHE
+    ,
+    /**
+     * This capability indicates that a http client (i.e. a 
CloseableHttpClient, specifically) is available.
+     */
+    HTTP_CLIENT
   }
 
   public enum ActivityType {

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunction.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunction.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunction.java
index efdd185..4fabfaf 100644
--- 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunction.java
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunction.java
@@ -17,10 +17,17 @@
  */
 package org.apache.metron.stellar.dsl;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.List;
 
-public interface StellarFunction {
+public interface StellarFunction extends Closeable {
   Object apply(List<Object> args, Context context) throws ParseException;
   void initialize(Context context);
   boolean isInitialized();
+
+  @Override
+  default void close() throws IOException {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunctions.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunctions.java
index dfec90e..73df82f 100644
--- 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunctions.java
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunctions.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.stellar.dsl;
 
+import java.io.IOException;
 import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
 import 
org.apache.metron.stellar.dsl.functions.resolver.SingletonFunctionResolver;
 
@@ -30,4 +31,8 @@ public class StellarFunctions {
   public static void initialize(Context context) {
     SingletonFunctionResolver.getInstance().initialize(context);
   }
+
+  public static void close() throws IOException {
+    SingletonFunctionResolver.getInstance().close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java
index 212d6e9..17f5f8d 100644
--- 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java
@@ -28,6 +28,7 @@ import org.apache.metron.stellar.common.utils.ConversionUtils;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
+import java.util.Date;
 import java.util.List;
 import java.util.Optional;
 import java.util.TimeZone;
@@ -109,6 +110,13 @@ public class DateFunctions {
     return sdf.parse(date).getTime();
   }
 
+  public static String getDateFormat(String format, Optional<Long> epochTime, 
Optional<String> timezone) {
+    Long time = epochTime.orElseGet(System::currentTimeMillis);
+    TimezonedFormat fmt = timezone.map(s -> new TimezonedFormat(format, 
s)).orElseGet(() -> new TimezonedFormat(format));
+    SimpleDateFormat sdf = formatCache.get(fmt).get();
+    return sdf.format(new Date(time));
+  }
+
 
   /**
    * Stellar Function: TO_EPOCH_TIMESTAMP
@@ -144,6 +152,40 @@ public class DateFunctions {
     }
   }
 
+  @Stellar( name="DATE_FORMAT",
+          description = "Takes an epoch timestamp and converts it to a date 
format.",
+          params = {"format - DateTime format as a String."
+                  , "timestampField - Optional epoch time in Long format.  
Defaults to now."
+                  , "timezone - Optional timezone in String format."},
+          returns="Formatted date."
+  )
+  public static class DateFormat extends BaseStellarFunction {
+
+    public Object apply(List<Object> objects) {
+      int size = objects.size();
+      Optional<Object> formatObj = Optional.ofNullable(objects.get(0));
+      Optional<Long> epochObj = Optional.empty();
+      Optional<String> tzObj = Optional.empty();
+      if (size > 1) {
+        if (size == 2) {
+          if (objects.get(1) == null) {
+            return null;
+          }
+          epochObj = objects.get(1) instanceof Long ? Optional.of((Long) 
objects.get(1)) : Optional.empty();
+          tzObj = objects.get(1) instanceof String ? Optional.of((String) 
objects.get(1)) : Optional.empty();
+        } else {
+          epochObj = Optional.ofNullable((Long) objects.get(1));
+          tzObj = Optional.ofNullable((String) objects.get(2));
+        }
+      }
+      if(formatObj.isPresent()) {
+        return getDateFormat(formatObj.get().toString(), epochObj, tzObj);
+      } else {
+        return null;
+      }
+    }
+  }
+
   /**
    * Gets the value from a list of arguments.
    *

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java
new file mode 100644
index 0000000..62d89f7
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.dsl.functions;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * A Map containing the Stellar REST settings.
+ */
+public class RestConfig extends HashMap<String, Object> {
+
+  /**
+   * A global config prefix used for storing Stellar REST settings.
+   */
+  public final static String STELLAR_REST_SETTINGS = "stellar.rest.settings";
+
+  /**
+   * User name for basic authentication.
+   */
+  public final static String BASIC_AUTH_USER = "basic.auth.user";
+
+  /**
+   * Path to the basic authentication password file stored in HDFS.
+   */
+  public final static String BASIC_AUTH_PASSWORD_PATH = 
"basic.auth.password.path";
+
+  /**
+   * Proxy host.
+   */
+  public final static String PROXY_HOST = "proxy.host";
+
+  /**
+   * Proxy port.
+   */
+  public final static String PROXY_PORT = "proxy.port";
+
+  /**
+   * User name for proxy basic authentication.
+   */
+  public final static String PROXY_BASIC_AUTH_USER = "proxy.basic.auth.user";
+
+  /**
+   * Path to the proxy basic authentication password file stored in HDFS.
+   */
+  public final static String PROXY_BASIC_AUTH_PASSWORD_PATH = 
"proxy.basic.auth.password.path";
+
+  /**
+   * Hard timeout for the total request time.
+   */
+  public final static String TIMEOUT = "timeout";
+
+  /**
+   * Timeouts exposed by the HttpClient object.
+   */
+  public final static String CONNECT_TIMEOUT = "connect.timeout";
+  public final static String CONNECTION_REQUEST_TIMEOUT = 
"connection.request.timeout";
+  public final static String SOCKET_TIMEOUT = "socket.timeout";
+
+  /**
+   * A list of response codes that are allowed.  All others will be treated as 
errors.
+   */
+  public final static String RESPONSE_CODES_ALLOWED = "response.codes.allowed";
+
+  /**
+   * The default value that will be returned on a successful request with 
empty content.  Default is null.
+   */
+  public final static String EMPTY_CONTENT_OVERRIDE = "empty.content.override";
+
+  /**
+   * The default value that will be returned on an error.  Default is null.
+   */
+  public final static String ERROR_VALUE_OVERRIDE = "error.value.override";
+
+  /**
+   * The maximum number of connections in the connection pool.
+   */
+  public final static String POOLING_MAX_TOTAL = "pooling.max.total";
+
+  /**
+   * The default maximum number of connections per route in the connection 
pool.
+   */
+  public final static String POOLING_DEFAULT_MAX_PER_RUOTE = 
"pooling.default.max.per.route";
+
+
+  public RestConfig() {
+    put(TIMEOUT, 1000);
+    put(RESPONSE_CODES_ALLOWED, Collections.singletonList(200));
+  }
+
+  public String getBasicAuthUser() {
+    return (String) get(BASIC_AUTH_USER);
+  }
+
+  public String getBasicAuthPasswordPath() {
+    return (String) get(BASIC_AUTH_PASSWORD_PATH);
+  }
+
+  public String getProxyHost() {
+    return (String) get(PROXY_HOST);
+  }
+
+  public Integer getProxyPort() {
+    return (Integer) get(PROXY_PORT);
+  }
+
+  public String getProxyBasicAuthUser() {
+    return (String) get(PROXY_BASIC_AUTH_USER);
+  }
+
+  public String getProxyBasicAuthPasswordPath() {
+    return (String) get(PROXY_BASIC_AUTH_PASSWORD_PATH);
+  }
+
+  public Integer getTimeout() {
+    return (Integer) get(TIMEOUT);
+  }
+
+  public Integer getConnectTimeout() {
+    return (Integer) get(CONNECT_TIMEOUT);
+  }
+
+  public Integer getConnectionRequestTimeout() {
+    return (Integer) get(CONNECTION_REQUEST_TIMEOUT);
+  }
+
+  public Integer getSocketTimeout() {
+    return (Integer) get(SOCKET_TIMEOUT);
+  }
+
+  public List<Integer> getResponseCodesAllowed() {
+    return (List<Integer>) get(RESPONSE_CODES_ALLOWED);
+  }
+
+  public Object getEmptyContentOverride() {
+    return get(EMPTY_CONTENT_OVERRIDE);
+  }
+
+  public Object getErrorValueOverride() {
+    return get(ERROR_VALUE_OVERRIDE);
+  }
+
+  public Integer getPoolingMaxTotal() {
+    return (Integer) get(POOLING_MAX_TOTAL);
+  }
+
+  public Integer getPoolingDefaultMaxPerRoute() {
+    return (Integer) get(POOLING_DEFAULT_MAX_PER_RUOTE);
+  }
+}

Reply via email to