http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
 
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index c7292ab..9ea7912 100644
--- 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ 
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -36,6 +36,7 @@ import java.util.Map;
 import java.util.Properties;
 import javax.annotation.Nullable;
 import kafka.consumer.ConsumerIterator;
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,6 +46,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.metron.common.Constants;
+import org.apache.metron.common.utils.HDFSUtils;
 import org.apache.metron.integration.BaseIntegrationTest;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.Processor;
@@ -55,12 +57,18 @@ import 
org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.MRComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
 import org.apache.metron.integration.utils.KafkaUtil;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Statusable;
 import org.apache.metron.pcap.PacketInfo;
 import org.apache.metron.pcap.PcapHelper;
 import org.apache.metron.pcap.PcapMerger;
+import org.apache.metron.pcap.config.FixedPcapConfig;
+import org.apache.metron.pcap.config.PcapOptions;
 import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
 import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.apache.metron.pcap.finalizer.PcapFinalizerStrategies;
 import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.metron.pcap.query.PcapCli;
 import org.apache.metron.spout.pcap.Endianness;
 import org.apache.metron.spout.pcap.deserializer.Deserializers;
 import org.apache.metron.test.utils.UnitTestHelper;
@@ -73,13 +81,22 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
   final static String KAFKA_TOPIC = "pcap";
   private static String BASE_DIR = "pcap";
   private static String DATA_DIR = BASE_DIR + "/data_dir";
-  private static String QUERY_DIR = BASE_DIR + "/query";
+  private static String INTERIM_RESULT = BASE_DIR + "/query";
+  private static String OUTPUT_DIR = BASE_DIR + "/output";
+  private static final int MAX_RETRIES = 30;
+  private static final int SLEEP_MS = 500;
   private String topologiesDir = "src/main/flux";
   private String targetDir = "target";
 
-  private static void clearOutDir(File outDir) {
-    for(File f : outDir.listFiles()) {
-      f.delete();
+  private static void clearOutDirs(File... dirs) throws IOException {
+    for(File dir: dirs) {
+      for(File f : dir.listFiles()) {
+        if (f.isDirectory()) {
+          FileUtils.deleteDirectory(f);
+        } else {
+          f.delete();
+        }
+      }
     }
   }
   private static int numFiles(File outDir, Configuration config) {
@@ -158,10 +175,10 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
       topologiesDir = UnitTestHelper.findDir("topologies");
     }
     targetDir = UnitTestHelper.findDir("target");
-    final File outDir = getOutDir(targetDir);
-    final File queryDir = getQueryDir(targetDir);
-    clearOutDir(outDir);
-    clearOutDir(queryDir);
+    final File inputDir = getDir(targetDir, DATA_DIR);
+    final File interimResultDir = getDir(targetDir, INTERIM_RESULT);
+    final File outputDir = getDir(targetDir, OUTPUT_DIR);
+    clearOutDirs(inputDir, interimResultDir, outputDir);
 
     File baseDir = new File(new File(targetDir), BASE_DIR);
     //Assert.assertEquals(0, numFiles(outDir));
@@ -175,7 +192,7 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
       setProperty("topology.worker.childopts", "");
       setProperty("spout.kafka.topic.pcap", KAFKA_TOPIC);
       setProperty("kafka.pcap.start", "EARLIEST");
-      setProperty("kafka.pcap.out", outDir.getAbsolutePath());
+      setProperty("kafka.pcap.out", inputDir.getAbsolutePath());
       setProperty("kafka.pcap.numPackets", "2");
       setProperty("kafka.pcap.maxTimeMS", "200000000");
       setProperty("kafka.pcap.ts_granularity", "NANOSECONDS");
@@ -219,7 +236,7 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
       runner.process(new Processor<Void>() {
         @Override
         public ReadinessState process(ComponentRunner runner) {
-          int numFiles = numFiles(outDir, mr.getConfiguration());
+          int numFiles = numFiles(inputDir, mr.getConfiguration());
           int expectedNumFiles = pcapEntries.size() / 2;
           if (numFiles == expectedNumFiles) {
             return ReadinessState.READY;
@@ -233,160 +250,222 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
           return null;
         }
       });
-      PcapJob job = new PcapJob();
+
+      FixedPcapConfig configuration = new 
FixedPcapConfig(PcapCli.PREFIX_STRATEGY);
+      Configuration hadoopConf = new Configuration();
+      PcapOptions.JOB_NAME.put(configuration, "jobName");
+      PcapOptions.HADOOP_CONF.put(configuration, hadoopConf);
+      PcapOptions.FILESYSTEM.put(configuration, FileSystem.get(hadoopConf));
+      PcapOptions.BASE_PATH.put(configuration, new 
Path(inputDir.getAbsolutePath()));
+      PcapOptions.BASE_INTERIM_RESULT_PATH.put(configuration, new 
Path(interimResultDir.getAbsolutePath()));
+      PcapOptions.START_TIME_NS.put(configuration, getTimestamp(4, 
pcapEntries));
+      PcapOptions.END_TIME_NS.put(configuration, getTimestamp(5, pcapEntries));
+      PcapOptions.NUM_REDUCERS.put(configuration, 10);
+      PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 2);
+      PcapOptions.FINAL_OUTPUT_PATH.put(configuration, new 
Path(outputDir.getAbsolutePath()));
       {
         //Ensure that only two pcaps are returned when we look at 4 and 5
-        Iterable<byte[]> results =
-                job.query(new Path(outDir.getAbsolutePath())
-                        , new Path(queryDir.getAbsolutePath())
-                        , getTimestamp(4, pcapEntries)
-                        , getTimestamp(5, pcapEntries)
-                        , 10
-                        , new HashMap<>()
-                        , new Configuration()
-                        , FileSystem.get(new Configuration())
-                        , new FixedPcapFilter.Configurator()
-                );
-        assertInOrder(results);
-        Assert.assertEquals(Iterables.size(results), 2);
+        PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
+        PcapOptions.FIELDS.put(configuration, new HashMap());
+        PcapJob<Map<String, String>> job = new PcapJob<>();
+        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
+        Assert.assertEquals(JobStatus.State.RUNNING, 
results.getStatus().getState());
+        waitForJob(results);
+
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+          try {
+            return HDFSUtils.readBytes(path);
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+        });
+        assertInOrder(bytes);
+        Assert.assertEquals(results.get().getSize(), 1);
       }
       {
         // Ensure that only two pcaps are returned when we look at 4 and 5
         // test with empty query filter
-        Iterable<byte[]> results =
-                job.query(new Path(outDir.getAbsolutePath())
-                        , new Path(queryDir.getAbsolutePath())
-                        , getTimestamp(4, pcapEntries)
-                        , getTimestamp(5, pcapEntries)
-                        , 10
-                        , ""
-                        , new Configuration()
-                        , FileSystem.get(new Configuration())
-                        , new QueryPcapFilter.Configurator()
-                );
-        assertInOrder(results);
-        Assert.assertEquals(Iterables.size(results), 2);
+        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+        PcapOptions.FIELDS.put(configuration, "");
+        PcapJob<String> job = new PcapJob<>();
+        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
+        Assert.assertEquals(JobStatus.State.RUNNING, 
results.getStatus().getState());
+        waitForJob(results);
+
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+          try {
+            return HDFSUtils.readBytes(path);
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+        });
+        assertInOrder(bytes);
+        Assert.assertEquals(results.get().getSize(), 1);
       }
       {
         //ensure that none get returned since that destination IP address 
isn't in the dataset
-        Iterable<byte[]> results =
-                job.query(new Path(outDir.getAbsolutePath())
-                        , new Path(queryDir.getAbsolutePath())
-                        , getTimestamp(0, pcapEntries)
-                        , getTimestamp(1, pcapEntries)
-                        , 10
-                        , new HashMap<String, String>() {{
-                          put(Constants.Fields.DST_ADDR.getName(), 
"207.28.210.1");
-                        }}
-                        , new Configuration()
-                        , FileSystem.get(new Configuration())
-                        , new FixedPcapFilter.Configurator()
-                );
-        assertInOrder(results);
-        Assert.assertEquals(Iterables.size(results), 0);
+        PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
+        PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
+          put(Constants.Fields.DST_ADDR.getName(), "207.28.210.1");
+        }});
+        PcapJob<Map<String, String>> job = new PcapJob<>();
+        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
+        Assert.assertEquals(JobStatus.State.RUNNING, 
results.getStatus().getState());
+        waitForJob(results);
+
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+          try {
+            return HDFSUtils.readBytes(path);
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+        });
+        assertInOrder(bytes);
+        Assert.assertEquals(results.get().getSize(), 0);
       }
       {
         // ensure that none get returned since that destination IP address 
isn't in the dataset
         // test with query filter
-        Iterable<byte[]> results =
-                job.query(new Path(outDir.getAbsolutePath())
-                        , new Path(queryDir.getAbsolutePath())
-                        , getTimestamp(0, pcapEntries)
-                        , getTimestamp(1, pcapEntries)
-                        , 10
-                        , "ip_dst_addr == '207.28.210.1'"
-                        , new Configuration()
-                        , FileSystem.get(new Configuration())
-                        , new QueryPcapFilter.Configurator()
-                );
-        assertInOrder(results);
-        Assert.assertEquals(Iterables.size(results), 0);
+        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+        PcapOptions.FIELDS.put(configuration, "ip_dst_addr == '207.28.210.1'");
+        PcapJob<String> job = new PcapJob<>();
+        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
+        Assert.assertEquals(JobStatus.State.RUNNING, 
results.getStatus().getState());
+        waitForJob(results);
+
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+          try {
+            return HDFSUtils.readBytes(path);
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+        });
+        assertInOrder(bytes);
+        Assert.assertEquals(results.get().getSize(), 0);
       }
       {
         //same with protocol as before with the destination addr
-        Iterable<byte[]> results =
-                job.query(new Path(outDir.getAbsolutePath())
-                        , new Path(queryDir.getAbsolutePath())
-                        , getTimestamp(0, pcapEntries)
-                        , getTimestamp(1, pcapEntries)
-                        , 10
-                        , new HashMap<String, String>() {{
-                          put(Constants.Fields.PROTOCOL.getName(), "foo");
-                        }}
-                        , new Configuration()
-                        , FileSystem.get(new Configuration())
-                        , new FixedPcapFilter.Configurator()
-                );
-        assertInOrder(results);
-        Assert.assertEquals(Iterables.size(results), 0);
+        PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
+        PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
+          put(Constants.Fields.PROTOCOL.getName(), "foo");
+        }});
+        PcapJob<Map<String, String>> job = new PcapJob<>();
+        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
+        Assert.assertEquals(JobStatus.State.RUNNING, 
results.getStatus().getState());
+        waitForJob(results);
+
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+          try {
+            return HDFSUtils.readBytes(path);
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+        });
+        assertInOrder(bytes);
+        Assert.assertEquals(results.get().getSize(), 0);
       }
       {
         //same with protocol as before with the destination addr
         //test with query filter
-        Iterable<byte[]> results =
-                job.query(new Path(outDir.getAbsolutePath())
-                        , new Path(queryDir.getAbsolutePath())
-                        , getTimestamp(0, pcapEntries)
-                        , getTimestamp(1, pcapEntries)
-                        , 10
-                        , "protocol == 'foo'"
-                        , new Configuration()
-                        , FileSystem.get(new Configuration())
-                        , new QueryPcapFilter.Configurator()
-                );
-        assertInOrder(results);
-        Assert.assertEquals(Iterables.size(results), 0);
+        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+        PcapOptions.FIELDS.put(configuration, "protocol == 'foo'");
+        PcapJob<String> job = new PcapJob<>();
+        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
+        Assert.assertEquals(JobStatus.State.RUNNING, 
results.getStatus().getState());
+        waitForJob(results);
+
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+          try {
+            return HDFSUtils.readBytes(path);
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+        });
+        assertInOrder(bytes);
+        Assert.assertEquals(results.get().getSize(), 0);
       }
       {
         //make sure I get them all.
-        Iterable<byte[]> results =
-                job.query(new Path(outDir.getAbsolutePath())
-                        , new Path(queryDir.getAbsolutePath())
-                        , getTimestamp(0, pcapEntries)
-                        , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
-                        , 10
-                        , new HashMap<>()
-                        , new Configuration()
-                        , FileSystem.get(new Configuration())
-                        , new FixedPcapFilter.Configurator()
-                );
-        assertInOrder(results);
-        Assert.assertEquals(Iterables.size(results), pcapEntries.size());
+        PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
+        PcapOptions.FIELDS.put(configuration, new HashMap<>());
+        PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, 
pcapEntries));
+        PcapOptions.END_TIME_NS.put(configuration, 
getTimestamp(pcapEntries.size()-1, pcapEntries) + 1);
+        PcapJob<Map<String, String>> job = new PcapJob<>();
+        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
+        Assert.assertEquals(JobStatus.State.RUNNING, 
results.getStatus().getState());
+        waitForJob(results);
+
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+          try {
+            return HDFSUtils.readBytes(path);
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+        });
+        assertInOrder(bytes);
+        Assert.assertEquals(10, results.get().getSize());
       }
       {
         //make sure I get them all.
         //with query filter
-        Iterable<byte[]> results =
-                job.query(new Path(outDir.getAbsolutePath())
-                        , new Path(queryDir.getAbsolutePath())
-                        , getTimestamp(0, pcapEntries)
-                        , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
-                        , 10
-                        , ""
-                        , new Configuration()
-                        , FileSystem.get(new Configuration())
-                        , new QueryPcapFilter.Configurator()
-                );
-        assertInOrder(results);
-        Assert.assertEquals(Iterables.size(results), pcapEntries.size());
+        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+        PcapOptions.FIELDS.put(configuration, "");
+        PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, 
pcapEntries));
+        PcapOptions.END_TIME_NS.put(configuration, 
getTimestamp(pcapEntries.size()-1, pcapEntries) + 1);
+        PcapJob<String> job = new PcapJob<>();
+        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
+        Assert.assertEquals(JobStatus.State.RUNNING, 
results.getStatus().getState());
+        waitForJob(results);
+
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+          try {
+            return HDFSUtils.readBytes(path);
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+        });
+        assertInOrder(bytes);
+        Assert.assertEquals(10, results.get().getSize());
       }
       {
-        Iterable<byte[]> results =
-                job.query(new Path(outDir.getAbsolutePath())
-                        , new Path(queryDir.getAbsolutePath())
-                        , getTimestamp(0, pcapEntries)
-                        , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
-                        , 10
-                        , new HashMap<String, String>() {{
-                          put(Constants.Fields.DST_PORT.getName(), "22");
-                        }}
-                        , new Configuration()
-                        , FileSystem.get(new Configuration())
-                        , new FixedPcapFilter.Configurator()
-                );
-        assertInOrder(results);
-        Assert.assertTrue(Iterables.size(results) > 0);
-        Assert.assertEquals(Iterables.size(results)
+        PcapOptions.FILTER_IMPL.put(configuration, new 
FixedPcapFilter.Configurator());
+        PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{
+          put(Constants.Fields.DST_PORT.getName(), "22");
+        }});
+        PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 1);
+        PcapJob<Map<String, String>> job = new PcapJob<>();
+        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
+        Assert.assertEquals(JobStatus.State.RUNNING, 
results.getStatus().getState());
+        waitForJob(results);
+
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+          try {
+            return HDFSUtils.readBytes(path);
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+        });
+        assertInOrder(bytes);
+        Assert.assertTrue(results.get().getSize() > 0);
+        Assert.assertEquals(Iterables.size(bytes)
                 , Iterables.size(filterPcaps(pcapEntries, new 
Predicate<JSONObject>() {
                           @Override
                           public boolean apply(@Nullable JSONObject input) {
@@ -397,74 +476,63 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
                 )
         );
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PcapMerger.merge(baos, Iterables.partition(results, 
1).iterator().next());
-        Assert.assertTrue(baos.toByteArray().length > 0);
-      }
-      {
-        //test with query filter and byte array matching
-        Iterable<byte[]> results =
-                job.query(new Path(outDir.getAbsolutePath())
-                        , new Path(queryDir.getAbsolutePath())
-                        , getTimestamp(0, pcapEntries)
-                        , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
-                        , 10
-                        , 
"BYTEARRAY_MATCHER('2f56abd814bc56420489ca38e7faf8cec3d4', packet)"
-                        , new Configuration()
-                        , FileSystem.get(new Configuration())
-                        , new QueryPcapFilter.Configurator()
-                );
-        assertInOrder(results);
-        Assert.assertEquals(1, Iterables.size(results));
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PcapMerger.merge(baos, Iterables.partition(results, 
1).iterator().next());
+        PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
         Assert.assertTrue(baos.toByteArray().length > 0);
       }
       {
+        //same with protocol as before with the destination addr
         //test with query filter
-        Iterable<byte[]> results =
-                job.query(new Path(outDir.getAbsolutePath())
-                        , new Path(queryDir.getAbsolutePath())
-                        , getTimestamp(0, pcapEntries)
-                        , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
-                        , 10
-                        , "ip_dst_port == 22"
-                        , new Configuration()
-                        , FileSystem.get(new Configuration())
-                        , new QueryPcapFilter.Configurator()
-                );
-        assertInOrder(results);
-        Assert.assertTrue(Iterables.size(results) > 0);
-        Assert.assertEquals(Iterables.size(results)
+        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+        PcapOptions.FIELDS.put(configuration, "ip_dst_port == 22");
+        PcapJob<String> job = new PcapJob<>();
+        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
+        Assert.assertEquals(JobStatus.State.RUNNING, 
results.getStatus().getState());
+        waitForJob(results);
+
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+          try {
+            return HDFSUtils.readBytes(path);
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+        });
+        assertInOrder(bytes);
+        Assert.assertEquals(Iterables.size(bytes)
                 , Iterables.size(filterPcaps(pcapEntries, new 
Predicate<JSONObject>() {
                           @Override
                           public boolean apply(@Nullable JSONObject input) {
                             Object prt = 
input.get(Constants.Fields.DST_PORT.getName());
-                            return prt != null && (Long) prt == 22;
+                            return prt != null && prt.toString().equals("22");
                           }
                         }, withHeaders)
                 )
         );
-        assertInOrder(results);
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PcapMerger.merge(baos, Iterables.partition(results, 
1).iterator().next());
+        PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
         Assert.assertTrue(baos.toByteArray().length > 0);
       }
       {
-        //test with query filter
-        Iterable<byte[]> results =
-                job.query(new Path(outDir.getAbsolutePath())
-                        , new Path(queryDir.getAbsolutePath())
-                        , getTimestamp(0, pcapEntries)
-                        , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
-                        , 10
-                        , "ip_dst_port > 20 and ip_dst_port < 55792"
-                        , new Configuration()
-                        , FileSystem.get(new Configuration())
-                        , new QueryPcapFilter.Configurator()
-                );
-        assertInOrder(results);
-        Assert.assertTrue(Iterables.size(results) > 0);
-        Assert.assertEquals(Iterables.size(results)
+        // test with query filter ip_dst_port > 20 and ip_dst_port < 55792
+        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+        PcapOptions.FIELDS.put(configuration, "ip_dst_port > 20 and 
ip_dst_port < 55792");
+        PcapJob<String> job = new PcapJob<>();
+        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
+        Assert.assertEquals(JobStatus.State.RUNNING, 
results.getStatus().getState());
+        waitForJob(results);
+
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+          try {
+            return HDFSUtils.readBytes(path);
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+        });
+        assertInOrder(bytes);
+        Assert.assertEquals(Iterables.size(bytes)
                 , Iterables.size(filterPcaps(pcapEntries, new 
Predicate<JSONObject>() {
                           @Override
                           public boolean apply(@Nullable JSONObject input) {
@@ -474,63 +542,92 @@ public class PcapTopologyIntegrationTest extends 
BaseIntegrationTest {
                         }, withHeaders)
                 )
         );
-        assertInOrder(results);
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PcapMerger.merge(baos, Iterables.partition(results, 
1).iterator().next());
+        PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
         Assert.assertTrue(baos.toByteArray().length > 0);
       }
       {
-        //test with query filter
-        Iterable<byte[]> results =
-                job.query(new Path(outDir.getAbsolutePath())
-                        , new Path(queryDir.getAbsolutePath())
-                        , getTimestamp(0, pcapEntries)
-                        , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
-                        , 10
-                        , "ip_dst_port > 55790"
-                        , new Configuration()
-                        , FileSystem.get(new Configuration())
-                        , new QueryPcapFilter.Configurator()
-                );
-        assertInOrder(results);
-        Assert.assertTrue(Iterables.size(results) > 0);
-        Assert.assertEquals(Iterables.size(results)
+        //test with query filter ip_dst_port > 55790
+        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+        PcapOptions.FIELDS.put(configuration, "ip_dst_port > 55790");
+        PcapJob<String> job = new PcapJob<>();
+        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
+        Assert.assertEquals(JobStatus.State.RUNNING, 
results.getStatus().getState());
+        waitForJob(results);
+
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+          try {
+            return HDFSUtils.readBytes(path);
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+        });
+        assertInOrder(bytes);
+        Assert.assertEquals(Iterables.size(bytes)
                 , Iterables.size(filterPcaps(pcapEntries, new 
Predicate<JSONObject>() {
-                  @Override
-                  public boolean apply(@Nullable JSONObject input) {
-                    Object prt = 
input.get(Constants.Fields.DST_PORT.getName());
-                    return prt != null && (Long) prt > 55790;
-                  }
-                }, withHeaders)
+                          @Override
+                          public boolean apply(@Nullable JSONObject input) {
+                            Object prt = 
input.get(Constants.Fields.DST_PORT.getName());
+                            return prt != null && (Long) prt > 55790;
+                          }
+                        }, withHeaders)
                 )
         );
-        assertInOrder(results);
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PcapMerger.merge(baos, Iterables.partition(results, 
1).iterator().next());
+        PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
+        Assert.assertTrue(baos.toByteArray().length > 0);
+      }
+      {
+        //test with query filter and byte array matching
+        PcapOptions.FILTER_IMPL.put(configuration, new 
QueryPcapFilter.Configurator());
+        PcapOptions.FIELDS.put(configuration, 
"BYTEARRAY_MATCHER('2f56abd814bc56420489ca38e7faf8cec3d4', packet)");
+        PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, 
pcapEntries));
+        PcapOptions.END_TIME_NS.put(configuration, 
getTimestamp(pcapEntries.size()-1, pcapEntries) + 1);
+        PcapJob<String> job = new PcapJob<>();
+        Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, 
configuration);
+        Assert.assertEquals(Statusable.JobType.MAP_REDUCE, 
results.getJobType());
+        Assert.assertEquals(JobStatus.State.RUNNING, 
results.getStatus().getState());
+        waitForJob(results);
+
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, 
results.getStatus().getState());
+        Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> {
+          try {
+            return HDFSUtils.readBytes(path);
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+        });
+        assertInOrder(bytes);
+        Assert.assertEquals(1, results.get().getSize());
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0)));
         Assert.assertTrue(baos.toByteArray().length > 0);
       }
+
       System.out.println("Ended");
     } finally {
       runner.stop();
-      clearOutDir(outDir);
-      clearOutDir(queryDir);
+      clearOutDirs(inputDir, interimResultDir, outputDir);
     }
   }
 
-  private File getOutDir(String targetDir) {
-    File outDir = new File(new File(targetDir), DATA_DIR);
-    if (!outDir.exists()) {
-      outDir.mkdirs();
+  private void waitForJob(Statusable statusable) throws Exception {
+    for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) {
+      if (statusable.isDone()) {
+        return;
+      }
     }
-    return outDir;
+    throw new Exception("Job did not complete within " + (MAX_RETRIES * 
SLEEP_MS) + " seconds");
   }
 
-  private File getQueryDir(String targetDir) {
-    File outDir = new File(new File(targetDir), QUERY_DIR);
-    if (!outDir.exists()) {
-      outDir.mkdirs();
+  private File getDir(String targetDir, String childDir) {
+    File directory = new File(new File(targetDir), childDir);
+    if (!directory.exists()) {
+      directory.mkdirs();
     }
-    return outDir;
+    return directory;
   }
 
   private static Iterable<Map.Entry<byte[], byte[]>> readPcaps(Path pcapFile, 
boolean withHeaders) throws IOException {

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
 
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
index 3468a7c..763f0c6 100644
--- 
a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
+++ 
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -19,13 +19,9 @@ package org.apache.metron.pcap.query;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.BufferedOutputStream;
@@ -35,28 +31,25 @@ import java.io.PrintStream;
 import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import java.util.Map.Entry;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.hadoop.SequenceFileIterable;
 import org.apache.metron.common.system.Clock;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.job.Finalizer;
 import org.apache.metron.pcap.PcapHelper;
-import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
-import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.apache.metron.pcap.config.FixedPcapConfig;
+import org.apache.metron.pcap.config.PcapConfig.PrefixStrategy;
+import org.apache.metron.pcap.config.PcapOptions;
 import org.apache.metron.pcap.mr.PcapJob;
-import org.apache.metron.pcap.writer.ResultsWriter;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
 public class PcapCliTest {
@@ -64,16 +57,15 @@ public class PcapCliTest {
   @Mock
   private PcapJob jobRunner;
   @Mock
-  private ResultsWriter resultsWriter;
-  @Mock
   private Clock clock;
   private String execDir;
+  private PrefixStrategy prefixStrategy;
 
   @Before
   public void setup() throws IOException {
     MockitoAnnotations.initMocks(this);
-    doCallRealMethod().when(jobRunner).writeResults(anyObject(), anyObject(), 
anyObject(), anyInt(), anyObject());
     execDir = System.getProperty("user.dir");
+    prefixStrategy = clock -> "random_prefix";
   }
 
   @Test
@@ -88,13 +80,7 @@ public class PcapCliTest {
             "-protocol", "6",
             "-packet_filter", "`casey`"
     };
-    List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), 
asBytes("def"), asBytes("ghi")});
-    Iterator iterator = pcaps.iterator();
-    SequenceFileIterable iterable = mock(SequenceFileIterable.class);
-    when(iterable.iterator()).thenReturn(iterator);
 
-    Path base_path = new Path(CliParser.BASE_PATH_DEFAULT);
-    Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT);
     HashMap<String, String> query = new HashMap<String, String>() {{
       put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
       put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2");
@@ -104,12 +90,44 @@ public class PcapCliTest {
       put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
       put(PcapHelper.PacketFields.PACKET_FILTER.getName(), "`casey`");
     }};
+    FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
+    PcapOptions.BASE_PATH.put(config, CliParser.BASE_PATH_DEFAULT);
+    PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, 
CliParser.BASE_INTERIM_OUTPUT_PATH_DEFAULT);
+    PcapOptions.FIELDS.put(config, query);
+    PcapOptions.NUM_REDUCERS.put(config, 10);
+    PcapOptions.START_TIME_MS.put(config, 500L);
 
-    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), 
anyLong(), anyInt(), eq(query), isA(Configuration.class), 
isA(FileSystem.class), 
isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable);
+    when(jobRunner.submit(isA(Finalizer.class), 
argThat(mapContaining(config)))).thenReturn(jobRunner);
 
-    PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> 
"random_prefix");
+    PcapCli cli = new PcapCli(jobRunner, prefixStrategy);
     assertThat("Expect no errors on run", cli.run(args), equalTo(0));
-    Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), 
eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap"));
+    verify(jobRunner).get();
+  }
+
+  /**
+   * Check that "map" entries exist in the tested map "item". Note, will not 
work for complex
+   * Objects where equals() does not compare contents favorably. e.g. 
Configurator() did not work.
+   */
+  private <K, V> Matcher<Map<K, V>> mapContaining(Map<K, V> map) {
+    return new TypeSafeMatcher<Map<K, V>>() {
+      @Override
+      protected boolean matchesSafely(Map<K, V> item) {
+        return item.entrySet().containsAll(map.entrySet());
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description.appendText("Should contain items: ");
+        for (Entry<K, V> entry : map.entrySet()) {
+          StringBuilder sb = new StringBuilder();
+          sb.append("key=");
+          sb.append(entry.getKey());
+          sb.append(",value=");
+          sb.append(entry.getValue());
+          description.appendText(sb.toString());
+        }
+      }
+    };
   }
 
   @Test
@@ -129,13 +147,6 @@ public class PcapCliTest {
             "-num_reducers", "10",
             "-records_per_file", "1000"
     };
-    List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), 
asBytes("def"), asBytes("ghi")});
-    Iterator iterator = pcaps.iterator();
-    SequenceFileIterable iterable = mock(SequenceFileIterable.class);
-    when(iterable.iterator()).thenReturn(iterator);
-
-    Path base_path = new Path("/base/path");
-    Path base_output_path = new Path("/base/output/path");
     Map<String, String> query = new HashMap<String, String>() {{
       put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
       put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2");
@@ -144,12 +155,20 @@ public class PcapCliTest {
       put(Constants.Fields.PROTOCOL.getName(), "6");
       put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true");
     }};
-
-    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), 
anyLong(), anyInt(), eq(query), isA(Configuration.class), 
isA(FileSystem.class), 
isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable);
-
-    PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> 
"random_prefix");
+    FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
+    PcapOptions.BASE_PATH.put(config, "/base/path");
+    PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, "/base/output/path");
+    PcapOptions.FIELDS.put(config, query);
+    PcapOptions.NUM_REDUCERS.put(config, 10);
+    PcapOptions.START_TIME_MS.put(config, 500L);
+    PcapOptions.END_TIME_MS.put(config, 1000L);
+    PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000);
+
+    when(jobRunner.submit(isA(Finalizer.class), 
argThat(mapContaining(config)))).thenReturn(jobRunner);
+
+    PcapCli cli = new PcapCli(jobRunner, prefixStrategy);
     assertThat("Expect no errors on run", cli.run(args), equalTo(0));
-    Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), 
eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap"));
+    verify(jobRunner).get();
   }
 
   @Test
@@ -170,13 +189,6 @@ public class PcapCliTest {
             "-num_reducers", "10",
             "-records_per_file", "1000"
     };
-    List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), 
asBytes("def"), asBytes("ghi")});
-    Iterator iterator = pcaps.iterator();
-    SequenceFileIterable iterable = mock(SequenceFileIterable.class);
-    when(iterable.iterator()).thenReturn(iterator);
-
-    Path base_path = new Path("/base/path");
-    Path base_output_path = new Path("/base/output/path");
     Map<String, String> query = new HashMap<String, String>() {{
       put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
       put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2");
@@ -188,11 +200,23 @@ public class PcapCliTest {
 
     long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss");
     long endAsNanos = asNanos("2016-06-15-18:35.00", "yyyy-MM-dd-HH:mm.ss");
-    when(jobRunner.query(eq(base_path), eq(base_output_path), 
eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), 
isA(Configuration.class), isA(FileSystem.class), 
isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable);
 
-    PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> 
"random_prefix");
+    FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
+    PcapOptions.BASE_PATH.put(config, "/base/path");
+    PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, "/base/output/path");
+    PcapOptions.FIELDS.put(config, query);
+    PcapOptions.NUM_REDUCERS.put(config, 10);
+    PcapOptions.START_TIME_MS.put(config, startAsNanos / 1000000L); // needed 
bc defaults in config
+    PcapOptions.END_TIME_MS.put(config, endAsNanos / 1000000L);  // needed bc 
defaults in config
+    PcapOptions.START_TIME_NS.put(config, startAsNanos);
+    PcapOptions.END_TIME_NS.put(config, endAsNanos);
+    PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000);
+
+    when(jobRunner.submit(isA(Finalizer.class), 
argThat(mapContaining(config)))).thenReturn(jobRunner);
+
+    PcapCli cli = new PcapCli(jobRunner, prefixStrategy);
     assertThat("Expect no errors on run", cli.run(args), equalTo(0));
-    Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), 
eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap"));
+    verify(jobRunner).get();
   }
 
   private long asNanos(String inDate, String format) throws ParseException {
@@ -212,20 +236,20 @@ public class PcapCliTest {
             "-start_time", "500",
             "-query", "some query string"
     };
-    List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), 
asBytes("def"), asBytes("ghi")});
-    Iterator iterator = pcaps.iterator();
-    SequenceFileIterable iterable = mock(SequenceFileIterable.class);
-    when(iterable.iterator()).thenReturn(iterator);
 
-    Path base_path = new Path(CliParser.BASE_PATH_DEFAULT);
-    Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT);
     String query = "some query string";
+    FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
+    PcapOptions.BASE_PATH.put(config, CliParser.BASE_PATH_DEFAULT);
+    PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, 
CliParser.BASE_INTERIM_OUTPUT_PATH_DEFAULT);
+    PcapOptions.FIELDS.put(config, query);
+    PcapOptions.NUM_REDUCERS.put(config, 10);
+    PcapOptions.START_TIME_MS.put(config, 500L);
 
-    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), 
anyLong(), anyInt(), eq(query), isA(Configuration.class), 
isA(FileSystem.class), 
isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable);
+    when(jobRunner.submit(isA(Finalizer.class), 
argThat(mapContaining(config)))).thenReturn(jobRunner);
 
-    PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> 
"random_prefix");
+    PcapCli cli = new PcapCli(jobRunner, prefixStrategy);
     assertThat("Expect no errors on run", cli.run(args), equalTo(0));
-    Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), 
eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap"));
+    verify(jobRunner).get();
   }
 
   @Test
@@ -240,20 +264,22 @@ public class PcapCliTest {
             "-query", "some query string",
             "-records_per_file", "1000"
     };
-    List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), 
asBytes("def"), asBytes("ghi")});
-    Iterator iterator = pcaps.iterator();
-    SequenceFileIterable iterable = mock(SequenceFileIterable.class);
-    when(iterable.iterator()).thenReturn(iterator);
 
-    Path base_path = new Path("/base/path");
-    Path base_output_path = new Path("/base/output/path");
     String query = "some query string";
-
-    when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), 
anyLong(), anyInt(), eq(query), isA(Configuration.class), 
isA(FileSystem.class), 
isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable);
-
-    PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> 
"random_prefix");
+    FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
+    PcapOptions.BASE_PATH.put(config, "/base/path");
+    PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, "/base/output/path");
+    PcapOptions.FIELDS.put(config, query);
+    PcapOptions.NUM_REDUCERS.put(config, 10);
+    PcapOptions.START_TIME_MS.put(config, 500L); // needed bc defaults in 
config
+    PcapOptions.END_TIME_MS.put(config, 1000L);  // needed bc defaults in 
config
+    PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000);
+
+    when(jobRunner.submit(isA(Finalizer.class), 
argThat(mapContaining(config)))).thenReturn(jobRunner);
+
+    PcapCli cli = new PcapCli(jobRunner, prefixStrategy);
     assertThat("Expect no errors on run", cli.run(args), equalTo(0));
-    Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), 
eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap"));
+    verify(jobRunner).get();
   }
 
   // INVALID OPTION CHECKS
@@ -290,7 +316,7 @@ public class PcapCliTest {
       PrintStream errOutStream = new PrintStream(new 
BufferedOutputStream(ebos));
       System.setErr(errOutStream);
 
-      PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> 
"random_prefix");
+      PcapCli cli = new PcapCli(jobRunner, clock -> "random_prefix");
       assertThat("Expect errors on run", cli.run(args), equalTo(-1));
       assertThat("Expect missing required option error: " + ebos.toString(), 
ebos.toString().contains(optMsg), equalTo(true));
       assertThat("Expect usage to be printed: " + bos.toString(), 
bos.toString().contains("usage: " + type + " filter options"), equalTo(true));

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java
deleted file mode 100644
index 997c5f7..0000000
--- 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.pcap;
-
-import java.util.List;
-import org.apache.hadoop.fs.Path;
-import org.apache.metron.job.Pageable;
-
-public class PcapFiles implements Pageable<Path> {
-
-  private List<Path> files;
-
-  public PcapFiles(List<Path> files) {
-    this.files = files;
-  }
-
-  @Override
-  public Iterable<Path> asIterable() {
-    return files;
-  }
-
-  @Override
-  public Path getPage(int num) {
-    return files.get(num);
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java
new file mode 100644
index 0000000..c98e681
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.job.Pageable;
+
+public class PcapPages implements Pageable<Path> {
+
+  private final List<Path> files;
+
+  /**
+   * Copy constructor.
+   */
+  public PcapPages(Pageable<Path> pages) {
+    this.files = new ArrayList<>();
+    for (Path path : pages) {
+      files.add(new Path(path.toString()));
+    }
+  }
+
+  /**
+   * Defaults with empty list.
+   */
+  public PcapPages() {
+    this.files = new ArrayList<>();
+  }
+
+  public PcapPages(List<Path> paths) {
+    files = new ArrayList<>(paths);
+  }
+
+  @Override
+  public Path getPage(int num) {
+    return files.get(num);
+  }
+
+  @Override
+  public int getSize() {
+    return files.size();
+  }
+
+  @Override
+  public Iterator<Path> iterator() {
+    return new PcapIterator(files.iterator());
+  }
+
+  private class PcapIterator implements Iterator<Path> {
+
+    private Iterator<Path> delegateIt;
+
+    public PcapIterator(Iterator<Path> iterator) {
+      this.delegateIt = iterator;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return delegateIt.hasNext();
+    }
+
+    @Override
+    public Path next() {
+      return delegateIt.next();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/FixedPcapConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/FixedPcapConfig.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/FixedPcapConfig.java
new file mode 100644
index 0000000..c40407b
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/FixedPcapConfig.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.config;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class FixedPcapConfig extends PcapConfig {
+
+  public FixedPcapConfig(PrefixStrategy prefixStrategy) {
+    super(prefixStrategy);
+    setFixedFields(new LinkedHashMap<>());
+  }
+
+  public Map<String, String> getFixedFields() {
+    return PcapOptions.FIELDS.get(this, Map.class);
+  }
+
+  public void setFixedFields(Map<String, String> fixedFields) {
+    PcapOptions.FIELDS.put(this, fixedFields);
+  }
+
+  public void putFixedField(String key, String value) {
+    Map<String, String> fixedFields = PcapOptions.FIELDS.get(this, Map.class);
+    String trimmedVal = value != null ? value.trim() : null;
+    if (!isNullOrEmpty(trimmedVal)) {
+      fixedFields.put(key, value);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java
new file mode 100644
index 0000000..26509be
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.config;
+
+import org.apache.commons.collections4.map.AbstractMapDecorator;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.system.Clock;
+import org.apache.metron.common.configuration.ConfigOption;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.function.Function;
+
+public class PcapConfig extends AbstractMapDecorator<String, Object>{
+  public interface PrefixStrategy extends Function<Clock, String>{}
+
+  private boolean showHelp;
+  private DateFormat dateFormat;
+
+  public PcapConfig() {
+    super(new HashMap<>());
+  }
+
+  public PcapConfig(PrefixStrategy prefixStrategy) {
+    this();
+    setShowHelp(false);
+    setBasePath("");
+    setBaseInterimResultPath("");
+    setStartTimeMs(-1L);
+    setEndTimeMs(-1L);
+    setNumReducers(0);
+    setFinalFilenamePrefix(prefixStrategy.apply(new Clock()));
+  }
+
+  public Object getOption(ConfigOption option) {
+    Object o = get(option.getKey());
+    return option.transform().apply(option.getKey(), o);
+  }
+
+  public String getFinalFilenamePrefix() {
+    return PcapOptions.FINAL_FILENAME_PREFIX.get(this, String.class);
+  }
+
+  public void setFinalFilenamePrefix(String prefix) {
+    PcapOptions.FINAL_FILENAME_PREFIX.put(this, prefix);
+  }
+
+  public int getNumReducers() {
+    return PcapOptions.NUM_REDUCERS.get(this, Integer.class);
+  }
+
+  public boolean showHelp() {
+    return showHelp;
+  }
+
+  public void setShowHelp(boolean showHelp) {
+    this.showHelp = showHelp;
+  }
+
+  public String getBasePath() {
+    return PcapOptions.BASE_PATH.get(this, String.class);
+  }
+
+  public String getBaseInterimResultPath() {
+    return PcapOptions.BASE_INTERIM_RESULT_PATH.get(this, String.class);
+  }
+
+  public long getStartTimeMs() {
+    return PcapOptions.START_TIME_MS.get(this, Long.class);
+  }
+
+  public long getEndTimeMs() {
+    return PcapOptions.END_TIME_MS.get(this, Long.class);
+  }
+
+  public void setBasePath(String basePath) {
+    PcapOptions.BASE_PATH.put(this, basePath);
+  }
+
+  public void setBaseInterimResultPath(String baseOutputPath) {
+    PcapOptions.BASE_INTERIM_RESULT_PATH.put(this, baseOutputPath);
+  }
+
+  public void setStartTimeMs(long startTime) {
+    PcapOptions.START_TIME_MS.put(this, startTime);
+  }
+
+  public void setEndTimeMs(long endTime) {
+    PcapOptions.END_TIME_MS.put(this, endTime);
+  }
+
+  public boolean isNullOrEmpty(String val) {
+    return StringUtils.isEmpty(val);
+  }
+
+  public void setDateFormat(String dateFormat) {
+    this.dateFormat = new SimpleDateFormat(dateFormat);
+  }
+
+  public DateFormat getDateFormat() {
+    return dateFormat;
+  }
+
+  public void setNumReducers(int numReducers) {
+    PcapOptions.NUM_REDUCERS.put(this, numReducers);
+  }
+
+  public int getNumRecordsPerFile() {
+    return PcapOptions.NUM_RECORDS_PER_FILE.get(this, Integer.class);
+  }
+
+  public void setNumRecordsPerFile(int numRecordsPerFile) {
+    PcapOptions.NUM_RECORDS_PER_FILE.put(this, numRecordsPerFile);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
new file mode 100644
index 0000000..09effd4
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.config;
+
+import java.util.function.BiFunction;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.configuration.ConfigOption;
+
+public enum PcapOptions implements ConfigOption {
+  JOB_NAME("jobName"),
+  FINAL_FILENAME_PREFIX("finalFilenamePrefix"),
+  BASE_PATH("basePath", (s, o) -> o == null ? null : new Path(o.toString())),
+  INTERIM_RESULT_PATH("interimResultPath", (s, o) -> o == null ? null : new 
Path(o.toString())),
+  BASE_INTERIM_RESULT_PATH("baseInterimResultPath", (s, o) -> o == null ? null 
: new Path(o.toString())),
+  FINAL_OUTPUT_PATH("finalOutputPath", (s, o) -> o == null ? null : new 
Path(o.toString())),
+  NUM_REDUCERS("numReducers"),
+  START_TIME_MS("startTimeMs"),
+  END_TIME_MS("endTimeMs"),
+  START_TIME_NS("startTimeNs"),
+  END_TIME_NS("endTimeNs"),
+  NUM_RECORDS_PER_FILE("numRecordsPerFile"),
+  FIELDS("fields"),
+  FILTER_IMPL("filterImpl"),
+  HADOOP_CONF("hadoopConf"),
+  FILESYSTEM("fileSystem");
+
+  public static final BiFunction<String, Object, Path> STRING_TO_PATH =
+      (s, o) -> o == null ? null : new Path(o.toString());
+  private String key;
+  private BiFunction<String, Object, Object> transform = (s, o) -> o;
+
+  PcapOptions(String key) {
+    this.key = key;
+  }
+
+  PcapOptions(String key, BiFunction<String, Object, Object> transform) {
+    this.key = key;
+    this.transform = transform;
+  }
+
+  @Override
+  public String getKey() {
+    return key;
+  }
+
+  @Override
+  public BiFunction<String, Object, Object> transform() {
+    return transform;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/QueryPcapConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/QueryPcapConfig.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/QueryPcapConfig.java
new file mode 100644
index 0000000..ef32839
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/QueryPcapConfig.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.config;
+
+public class QueryPcapConfig extends PcapConfig {
+
+  public QueryPcapConfig(PrefixStrategy prefixStrategy) {
+    super(prefixStrategy);
+  }
+
+  public String getQuery() {
+    return PcapOptions.FIELDS.get(this, String.class);
+  }
+
+  public void setQuery(String query) {
+    PcapOptions.FIELDS.put(this, query);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
new file mode 100644
index 0000000..e032158
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.finalizer;
+
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.pcap.config.PcapOptions;
+
+/**
+ * Write to local FS.
+ */
+public class PcapCliFinalizer extends PcapFinalizer {
+
+  /**
+   * Format will have the format 
&lt;output-path&gt;/pcap-data-&lt;filename-prefix&gt;+&lt;partition-num&gt;.pcap
+   * The filename prefix is pluggable, but in most cases it will be provided 
via the PcapConfig
+   * as a formatted timestamp + uuid. A final sample format will look as 
follows:
+   * 
/base/output/path/pcap-data-201807181911-09855b4ae3204dee8b63760d65198da3+0001.pcap
+   */
+  private static final String PCAP_CLI_FILENAME_FORMAT = 
"%s/pcap-data-%s+%04d.pcap";
+
+  @Override
+  protected String getOutputFileName(Map<String, Object> config, int 
partition) {
+    Path finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, 
PcapOptions.STRING_TO_PATH, Path.class);
+    String prefix = PcapOptions.FINAL_FILENAME_PREFIX.get(config, 
String.class);
+    return String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, prefix, 
partition);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
new file mode 100644
index 0000000..d5ac675
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.finalizer;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.metron.common.hadoop.SequenceFileIterable;
+import org.apache.metron.job.Finalizer;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.Pageable;
+import org.apache.metron.pcap.PcapPages;
+import org.apache.metron.pcap.config.PcapOptions;
+import org.apache.metron.pcap.writer.PcapResultsWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Takes Pcap results from a specified path - for PCAP, it is assumed that 
these results are SequenceFileIterables.
+ * The results are then processed by partitioning the results based on a num 
records per file option
+ * into a final output file with a PCAP header for each partition, and written 
to a final output location.
+ * The MapReduce results are cleaned up after successfully writing out the 
final results.
+ */
+public abstract class PcapFinalizer implements Finalizer<Path> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private PcapResultsWriter resultsWriter;
+
+  protected PcapFinalizer() {
+    this.resultsWriter = new PcapResultsWriter();
+  }
+
+  protected PcapResultsWriter getResultsWriter() {
+    return resultsWriter;
+  }
+
+  @Override
+  public Pageable<Path> finalizeJob(Map<String, Object> config) throws 
JobException {
+    Configuration hadoopConfig = PcapOptions.HADOOP_CONF.get(config, 
Configuration.class);
+    int recPerFile = PcapOptions.NUM_RECORDS_PER_FILE.get(config, 
Integer.class);
+    Path interimResultPath = PcapOptions.INTERIM_RESULT_PATH
+        .get(config, PcapOptions.STRING_TO_PATH, Path.class);
+    FileSystem fs = PcapOptions.FILESYSTEM.get(config, FileSystem.class);
+
+    SequenceFileIterable interimResults = null;
+    try {
+      interimResults = readInterimResults(interimResultPath, hadoopConfig, fs);
+    } catch (IOException e) {
+      throw new JobException("Unable to read interim job results while 
finalizing", e);
+    }
+    List<Path> outFiles = new ArrayList<>();
+    try {
+      Iterable<List<byte[]>> partitions = Iterables.partition(interimResults, 
recPerFile);
+      int part = 1;
+      if (partitions.iterator().hasNext()) {
+        for (List<byte[]> data : partitions) {
+          String outFileName = getOutputFileName(config, part++);
+          if (data.size() > 0) {
+            getResultsWriter().write(hadoopConfig, data, outFileName);
+            outFiles.add(new Path(outFileName));
+          }
+        }
+      } else {
+        LOG.info("No results returned.");
+      }
+    } catch (IOException e) {
+      throw new JobException("Failed to finalize results", e);
+    } finally {
+      try {
+        interimResults.cleanup();
+      } catch (IOException e) {
+        LOG.warn("Unable to cleanup files in HDFS", e);
+      }
+    }
+    return new PcapPages(outFiles);
+  }
+
+  protected abstract String getOutputFileName(Map<String, Object> config, int 
partition);
+
+  /**
+   * Returns a lazily-read Iterable over a set of sequence files.
+   */
+  protected SequenceFileIterable readInterimResults(Path interimResultPath, 
Configuration config,
+      FileSystem fs) throws IOException {
+    List<Path> files = new ArrayList<>();
+    for (RemoteIterator<LocatedFileStatus> it = 
fs.listFiles(interimResultPath, false);
+        it.hasNext(); ) {
+      Path p = it.next().getPath();
+      if (p.getName().equals("_SUCCESS")) {
+        fs.delete(p, false);
+        continue;
+      }
+      files.add(p);
+    }
+    if (files.size() == 0) {
+      LOG.info("No files to process with specified date range.");
+    } else {
+      LOG.debug("Interim results path={}", interimResultPath);
+      Collections.sort(files, (o1, o2) -> 
o1.getName().compareTo(o2.getName()));
+    }
+    return new SequenceFileIterable(files, config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizerStrategies.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizerStrategies.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizerStrategies.java
new file mode 100644
index 0000000..927d602
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizerStrategies.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.finalizer;
+
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.job.Finalizer;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.Pageable;
+
+/**
+ * PcapJob runs a MapReduce job that outputs Sequence Files to HDFS. This 
Strategy/Factory class
+ * provides options for doing final processing on this raw MapReduce output 
for the CLI and REST
+ * API's.
+ */
+public enum PcapFinalizerStrategies implements Finalizer<Path> {
+  CLI(new PcapCliFinalizer()),
+  REST(new PcapRestFinalizer());
+
+  private Finalizer<Path> finalizer;
+
+  PcapFinalizerStrategies(Finalizer<Path> finalizer) {
+    this.finalizer = finalizer;
+  }
+
+  @Override
+  public Pageable<Path> finalizeJob(Map<String, Object> config) throws 
JobException {
+    return finalizer.finalizeJob(config);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
new file mode 100644
index 0000000..059bba2
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.finalizer;
+
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.pcap.config.PcapOptions;
+
+/**
+ * Write to HDFS.
+ */
+public class PcapRestFinalizer extends PcapFinalizer {
+
+  /**
+   * Format will have the format &lt;output-path&gt;/page-&lt;page-num&gt;.pcap
+   * The filename prefix is pluggable, but in most cases it will be provided 
via the PcapConfig
+   * as a formatted timestamp + uuid. A final sample format will look as 
follows:
+   * 
/base/output/path/pcap-data-201807181911-09855b4ae3204dee8b63760d65198da3+0001.pcap
+   */
+  private static final String PCAP_CLI_FILENAME_FORMAT = "%s/page-%s.pcap";
+
+  @Override
+  protected String getOutputFileName(Map<String, Object> config, int 
partition) {
+    Path finalOutputPath = 
PcapOptions.FINAL_OUTPUT_PATH.getTransformed(config, Path.class);
+    return String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, partition);
+  }
+
+}

Reply via email to