http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index c7292ab..9ea7912 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Properties; import javax.annotation.Nullable; import kafka.consumer.ConsumerIterator; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,6 +46,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.kafka.clients.producer.Producer; import org.apache.metron.common.Constants; +import org.apache.metron.common.utils.HDFSUtils; import org.apache.metron.integration.BaseIntegrationTest; import org.apache.metron.integration.ComponentRunner; import org.apache.metron.integration.Processor; @@ -55,12 +57,18 @@ import org.apache.metron.integration.components.KafkaComponent; import org.apache.metron.integration.components.MRComponent; import org.apache.metron.integration.components.ZKServerComponent; import org.apache.metron.integration.utils.KafkaUtil; +import org.apache.metron.job.JobStatus; +import org.apache.metron.job.Statusable; import org.apache.metron.pcap.PacketInfo; import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.PcapMerger; +import org.apache.metron.pcap.config.FixedPcapConfig; +import org.apache.metron.pcap.config.PcapOptions; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.pcap.filter.query.QueryPcapFilter; +import org.apache.metron.pcap.finalizer.PcapFinalizerStrategies; import org.apache.metron.pcap.mr.PcapJob; +import org.apache.metron.pcap.query.PcapCli; import org.apache.metron.spout.pcap.Endianness; import org.apache.metron.spout.pcap.deserializer.Deserializers; import org.apache.metron.test.utils.UnitTestHelper; @@ -73,13 +81,22 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { final static String KAFKA_TOPIC = "pcap"; private static String BASE_DIR = "pcap"; private static String DATA_DIR = BASE_DIR + "/data_dir"; - private static String QUERY_DIR = BASE_DIR + "/query"; + private static String INTERIM_RESULT = BASE_DIR + "/query"; + private static String OUTPUT_DIR = BASE_DIR + "/output"; + private static final int MAX_RETRIES = 30; + private static final int SLEEP_MS = 500; private String topologiesDir = "src/main/flux"; private String targetDir = "target"; - private static void clearOutDir(File outDir) { - for(File f : outDir.listFiles()) { - f.delete(); + private static void clearOutDirs(File... dirs) throws IOException { + for(File dir: dirs) { + for(File f : dir.listFiles()) { + if (f.isDirectory()) { + FileUtils.deleteDirectory(f); + } else { + f.delete(); + } + } } } private static int numFiles(File outDir, Configuration config) { @@ -158,10 +175,10 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { topologiesDir = UnitTestHelper.findDir("topologies"); } targetDir = UnitTestHelper.findDir("target"); - final File outDir = getOutDir(targetDir); - final File queryDir = getQueryDir(targetDir); - clearOutDir(outDir); - clearOutDir(queryDir); + final File inputDir = getDir(targetDir, DATA_DIR); + final File interimResultDir = getDir(targetDir, INTERIM_RESULT); + final File outputDir = getDir(targetDir, OUTPUT_DIR); + clearOutDirs(inputDir, interimResultDir, outputDir); File baseDir = new File(new File(targetDir), BASE_DIR); //Assert.assertEquals(0, numFiles(outDir)); @@ -175,7 +192,7 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { setProperty("topology.worker.childopts", ""); setProperty("spout.kafka.topic.pcap", KAFKA_TOPIC); setProperty("kafka.pcap.start", "EARLIEST"); - setProperty("kafka.pcap.out", outDir.getAbsolutePath()); + setProperty("kafka.pcap.out", inputDir.getAbsolutePath()); setProperty("kafka.pcap.numPackets", "2"); setProperty("kafka.pcap.maxTimeMS", "200000000"); setProperty("kafka.pcap.ts_granularity", "NANOSECONDS"); @@ -219,7 +236,7 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { runner.process(new Processor<Void>() { @Override public ReadinessState process(ComponentRunner runner) { - int numFiles = numFiles(outDir, mr.getConfiguration()); + int numFiles = numFiles(inputDir, mr.getConfiguration()); int expectedNumFiles = pcapEntries.size() / 2; if (numFiles == expectedNumFiles) { return ReadinessState.READY; @@ -233,160 +250,222 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { return null; } }); - PcapJob job = new PcapJob(); + + FixedPcapConfig configuration = new FixedPcapConfig(PcapCli.PREFIX_STRATEGY); + Configuration hadoopConf = new Configuration(); + PcapOptions.JOB_NAME.put(configuration, "jobName"); + PcapOptions.HADOOP_CONF.put(configuration, hadoopConf); + PcapOptions.FILESYSTEM.put(configuration, FileSystem.get(hadoopConf)); + PcapOptions.BASE_PATH.put(configuration, new Path(inputDir.getAbsolutePath())); + PcapOptions.BASE_INTERIM_RESULT_PATH.put(configuration, new Path(interimResultDir.getAbsolutePath())); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(4, pcapEntries)); + PcapOptions.END_TIME_NS.put(configuration, getTimestamp(5, pcapEntries)); + PcapOptions.NUM_REDUCERS.put(configuration, 10); + PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 2); + PcapOptions.FINAL_OUTPUT_PATH.put(configuration, new Path(outputDir.getAbsolutePath())); { //Ensure that only two pcaps are returned when we look at 4 and 5 - Iterable<byte[]> results = - job.query(new Path(outDir.getAbsolutePath()) - , new Path(queryDir.getAbsolutePath()) - , getTimestamp(4, pcapEntries) - , getTimestamp(5, pcapEntries) - , 10 - , new HashMap<>() - , new Configuration() - , FileSystem.get(new Configuration()) - , new FixedPcapFilter.Configurator() - ); - assertInOrder(results); - Assert.assertEquals(Iterables.size(results), 2); + PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, new HashMap()); + PcapJob<Map<String, String>> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }); + assertInOrder(bytes); + Assert.assertEquals(results.get().getSize(), 1); } { // Ensure that only two pcaps are returned when we look at 4 and 5 // test with empty query filter - Iterable<byte[]> results = - job.query(new Path(outDir.getAbsolutePath()) - , new Path(queryDir.getAbsolutePath()) - , getTimestamp(4, pcapEntries) - , getTimestamp(5, pcapEntries) - , 10 - , "" - , new Configuration() - , FileSystem.get(new Configuration()) - , new QueryPcapFilter.Configurator() - ); - assertInOrder(results); - Assert.assertEquals(Iterables.size(results), 2); + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, ""); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }); + assertInOrder(bytes); + Assert.assertEquals(results.get().getSize(), 1); } { //ensure that none get returned since that destination IP address isn't in the dataset - Iterable<byte[]> results = - job.query(new Path(outDir.getAbsolutePath()) - , new Path(queryDir.getAbsolutePath()) - , getTimestamp(0, pcapEntries) - , getTimestamp(1, pcapEntries) - , 10 - , new HashMap<String, String>() {{ - put(Constants.Fields.DST_ADDR.getName(), "207.28.210.1"); - }} - , new Configuration() - , FileSystem.get(new Configuration()) - , new FixedPcapFilter.Configurator() - ); - assertInOrder(results); - Assert.assertEquals(Iterables.size(results), 0); + PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{ + put(Constants.Fields.DST_ADDR.getName(), "207.28.210.1"); + }}); + PcapJob<Map<String, String>> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }); + assertInOrder(bytes); + Assert.assertEquals(results.get().getSize(), 0); } { // ensure that none get returned since that destination IP address isn't in the dataset // test with query filter - Iterable<byte[]> results = - job.query(new Path(outDir.getAbsolutePath()) - , new Path(queryDir.getAbsolutePath()) - , getTimestamp(0, pcapEntries) - , getTimestamp(1, pcapEntries) - , 10 - , "ip_dst_addr == '207.28.210.1'" - , new Configuration() - , FileSystem.get(new Configuration()) - , new QueryPcapFilter.Configurator() - ); - assertInOrder(results); - Assert.assertEquals(Iterables.size(results), 0); + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, "ip_dst_addr == '207.28.210.1'"); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }); + assertInOrder(bytes); + Assert.assertEquals(results.get().getSize(), 0); } { //same with protocol as before with the destination addr - Iterable<byte[]> results = - job.query(new Path(outDir.getAbsolutePath()) - , new Path(queryDir.getAbsolutePath()) - , getTimestamp(0, pcapEntries) - , getTimestamp(1, pcapEntries) - , 10 - , new HashMap<String, String>() {{ - put(Constants.Fields.PROTOCOL.getName(), "foo"); - }} - , new Configuration() - , FileSystem.get(new Configuration()) - , new FixedPcapFilter.Configurator() - ); - assertInOrder(results); - Assert.assertEquals(Iterables.size(results), 0); + PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{ + put(Constants.Fields.PROTOCOL.getName(), "foo"); + }}); + PcapJob<Map<String, String>> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }); + assertInOrder(bytes); + Assert.assertEquals(results.get().getSize(), 0); } { //same with protocol as before with the destination addr //test with query filter - Iterable<byte[]> results = - job.query(new Path(outDir.getAbsolutePath()) - , new Path(queryDir.getAbsolutePath()) - , getTimestamp(0, pcapEntries) - , getTimestamp(1, pcapEntries) - , 10 - , "protocol == 'foo'" - , new Configuration() - , FileSystem.get(new Configuration()) - , new QueryPcapFilter.Configurator() - ); - assertInOrder(results); - Assert.assertEquals(Iterables.size(results), 0); + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, "protocol == 'foo'"); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }); + assertInOrder(bytes); + Assert.assertEquals(results.get().getSize(), 0); } { //make sure I get them all. - Iterable<byte[]> results = - job.query(new Path(outDir.getAbsolutePath()) - , new Path(queryDir.getAbsolutePath()) - , getTimestamp(0, pcapEntries) - , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 - , 10 - , new HashMap<>() - , new Configuration() - , FileSystem.get(new Configuration()) - , new FixedPcapFilter.Configurator() - ); - assertInOrder(results); - Assert.assertEquals(Iterables.size(results), pcapEntries.size()); + PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, new HashMap<>()); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); + PcapOptions.END_TIME_NS.put(configuration, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1); + PcapJob<Map<String, String>> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }); + assertInOrder(bytes); + Assert.assertEquals(10, results.get().getSize()); } { //make sure I get them all. //with query filter - Iterable<byte[]> results = - job.query(new Path(outDir.getAbsolutePath()) - , new Path(queryDir.getAbsolutePath()) - , getTimestamp(0, pcapEntries) - , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 - , 10 - , "" - , new Configuration() - , FileSystem.get(new Configuration()) - , new QueryPcapFilter.Configurator() - ); - assertInOrder(results); - Assert.assertEquals(Iterables.size(results), pcapEntries.size()); + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, ""); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); + PcapOptions.END_TIME_NS.put(configuration, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }); + assertInOrder(bytes); + Assert.assertEquals(10, results.get().getSize()); } { - Iterable<byte[]> results = - job.query(new Path(outDir.getAbsolutePath()) - , new Path(queryDir.getAbsolutePath()) - , getTimestamp(0, pcapEntries) - , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 - , 10 - , new HashMap<String, String>() {{ - put(Constants.Fields.DST_PORT.getName(), "22"); - }} - , new Configuration() - , FileSystem.get(new Configuration()) - , new FixedPcapFilter.Configurator() - ); - assertInOrder(results); - Assert.assertTrue(Iterables.size(results) > 0); - Assert.assertEquals(Iterables.size(results) + PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{ + put(Constants.Fields.DST_PORT.getName(), "22"); + }}); + PcapOptions.NUM_RECORDS_PER_FILE.put(configuration, 1); + PcapJob<Map<String, String>> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }); + assertInOrder(bytes); + Assert.assertTrue(results.get().getSize() > 0); + Assert.assertEquals(Iterables.size(bytes) , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { @Override public boolean apply(@Nullable JSONObject input) { @@ -397,74 +476,63 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { ) ); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next()); - Assert.assertTrue(baos.toByteArray().length > 0); - } - { - //test with query filter and byte array matching - Iterable<byte[]> results = - job.query(new Path(outDir.getAbsolutePath()) - , new Path(queryDir.getAbsolutePath()) - , getTimestamp(0, pcapEntries) - , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 - , 10 - , "BYTEARRAY_MATCHER('2f56abd814bc56420489ca38e7faf8cec3d4', packet)" - , new Configuration() - , FileSystem.get(new Configuration()) - , new QueryPcapFilter.Configurator() - ); - assertInOrder(results); - Assert.assertEquals(1, Iterables.size(results)); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next()); + PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0))); Assert.assertTrue(baos.toByteArray().length > 0); } { + //same with protocol as before with the destination addr //test with query filter - Iterable<byte[]> results = - job.query(new Path(outDir.getAbsolutePath()) - , new Path(queryDir.getAbsolutePath()) - , getTimestamp(0, pcapEntries) - , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 - , 10 - , "ip_dst_port == 22" - , new Configuration() - , FileSystem.get(new Configuration()) - , new QueryPcapFilter.Configurator() - ); - assertInOrder(results); - Assert.assertTrue(Iterables.size(results) > 0); - Assert.assertEquals(Iterables.size(results) + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, "ip_dst_port == 22"); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }); + assertInOrder(bytes); + Assert.assertEquals(Iterables.size(bytes) , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { @Override public boolean apply(@Nullable JSONObject input) { Object prt = input.get(Constants.Fields.DST_PORT.getName()); - return prt != null && (Long) prt == 22; + return prt != null && prt.toString().equals("22"); } }, withHeaders) ) ); - assertInOrder(results); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next()); + PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0))); Assert.assertTrue(baos.toByteArray().length > 0); } { - //test with query filter - Iterable<byte[]> results = - job.query(new Path(outDir.getAbsolutePath()) - , new Path(queryDir.getAbsolutePath()) - , getTimestamp(0, pcapEntries) - , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 - , 10 - , "ip_dst_port > 20 and ip_dst_port < 55792" - , new Configuration() - , FileSystem.get(new Configuration()) - , new QueryPcapFilter.Configurator() - ); - assertInOrder(results); - Assert.assertTrue(Iterables.size(results) > 0); - Assert.assertEquals(Iterables.size(results) + // test with query filter ip_dst_port > 20 and ip_dst_port < 55792 + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, "ip_dst_port > 20 and ip_dst_port < 55792"); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }); + assertInOrder(bytes); + Assert.assertEquals(Iterables.size(bytes) , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { @Override public boolean apply(@Nullable JSONObject input) { @@ -474,63 +542,92 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { }, withHeaders) ) ); - assertInOrder(results); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next()); + PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0))); Assert.assertTrue(baos.toByteArray().length > 0); } { - //test with query filter - Iterable<byte[]> results = - job.query(new Path(outDir.getAbsolutePath()) - , new Path(queryDir.getAbsolutePath()) - , getTimestamp(0, pcapEntries) - , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1 - , 10 - , "ip_dst_port > 55790" - , new Configuration() - , FileSystem.get(new Configuration()) - , new QueryPcapFilter.Configurator() - ); - assertInOrder(results); - Assert.assertTrue(Iterables.size(results) > 0); - Assert.assertEquals(Iterables.size(results) + //test with query filter ip_dst_port > 55790 + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, "ip_dst_port > 55790"); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }); + assertInOrder(bytes); + Assert.assertEquals(Iterables.size(bytes) , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() { - @Override - public boolean apply(@Nullable JSONObject input) { - Object prt = input.get(Constants.Fields.DST_PORT.getName()); - return prt != null && (Long) prt > 55790; - } - }, withHeaders) + @Override + public boolean apply(@Nullable JSONObject input) { + Object prt = input.get(Constants.Fields.DST_PORT.getName()); + return prt != null && (Long) prt > 55790; + } + }, withHeaders) ) ); - assertInOrder(results); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PcapMerger.merge(baos, Iterables.partition(results, 1).iterator().next()); + PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0))); + Assert.assertTrue(baos.toByteArray().length > 0); + } + { + //test with query filter and byte array matching + PcapOptions.FILTER_IMPL.put(configuration, new QueryPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, "BYTEARRAY_MATCHER('2f56abd814bc56420489ca38e7faf8cec3d4', packet)"); + PcapOptions.START_TIME_NS.put(configuration, getTimestamp(0, pcapEntries)); + PcapOptions.END_TIME_NS.put(configuration, getTimestamp(pcapEntries.size()-1, pcapEntries) + 1); + PcapJob<String> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Iterable<byte[]> bytes = Iterables.transform(results.get(), path -> { + try { + return HDFSUtils.readBytes(path); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }); + assertInOrder(bytes); + Assert.assertEquals(1, results.get().getSize()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PcapMerger.merge(baos, HDFSUtils.readBytes(results.get().getPage(0))); Assert.assertTrue(baos.toByteArray().length > 0); } + System.out.println("Ended"); } finally { runner.stop(); - clearOutDir(outDir); - clearOutDir(queryDir); + clearOutDirs(inputDir, interimResultDir, outputDir); } } - private File getOutDir(String targetDir) { - File outDir = new File(new File(targetDir), DATA_DIR); - if (!outDir.exists()) { - outDir.mkdirs(); + private void waitForJob(Statusable statusable) throws Exception { + for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) { + if (statusable.isDone()) { + return; + } } - return outDir; + throw new Exception("Job did not complete within " + (MAX_RETRIES * SLEEP_MS) + " seconds"); } - private File getQueryDir(String targetDir) { - File outDir = new File(new File(targetDir), QUERY_DIR); - if (!outDir.exists()) { - outDir.mkdirs(); + private File getDir(String targetDir, String childDir) { + File directory = new File(new File(targetDir), childDir); + if (!directory.exists()) { + directory.mkdirs(); } - return outDir; + return directory; } private static Iterable<Map.Entry<byte[], byte[]>> readPcaps(Path pcapFile, boolean withHeaders) throws IOException {
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java index 3468a7c..763f0c6 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java @@ -19,13 +19,9 @@ package org.apache.metron.pcap.query; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.BufferedOutputStream; @@ -35,28 +31,25 @@ import java.io.PrintStream; import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.Arrays; import java.util.Date; import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import java.util.Map.Entry; import org.apache.metron.common.Constants; -import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.common.system.Clock; import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.apache.metron.job.Finalizer; import org.apache.metron.pcap.PcapHelper; -import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; -import org.apache.metron.pcap.filter.query.QueryPcapFilter; +import org.apache.metron.pcap.config.FixedPcapConfig; +import org.apache.metron.pcap.config.PcapConfig.PrefixStrategy; +import org.apache.metron.pcap.config.PcapOptions; import org.apache.metron.pcap.mr.PcapJob; -import org.apache.metron.pcap.writer.ResultsWriter; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.MockitoAnnotations; public class PcapCliTest { @@ -64,16 +57,15 @@ public class PcapCliTest { @Mock private PcapJob jobRunner; @Mock - private ResultsWriter resultsWriter; - @Mock private Clock clock; private String execDir; + private PrefixStrategy prefixStrategy; @Before public void setup() throws IOException { MockitoAnnotations.initMocks(this); - doCallRealMethod().when(jobRunner).writeResults(anyObject(), anyObject(), anyObject(), anyInt(), anyObject()); execDir = System.getProperty("user.dir"); + prefixStrategy = clock -> "random_prefix"; } @Test @@ -88,13 +80,7 @@ public class PcapCliTest { "-protocol", "6", "-packet_filter", "`casey`" }; - List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); - Iterator iterator = pcaps.iterator(); - SequenceFileIterable iterable = mock(SequenceFileIterable.class); - when(iterable.iterator()).thenReturn(iterator); - Path base_path = new Path(CliParser.BASE_PATH_DEFAULT); - Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT); HashMap<String, String> query = new HashMap<String, String>() {{ put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1"); put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2"); @@ -104,12 +90,44 @@ public class PcapCliTest { put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); put(PcapHelper.PacketFields.PACKET_FILTER.getName(), "`casey`"); }}; + FixedPcapConfig config = new FixedPcapConfig(prefixStrategy); + PcapOptions.BASE_PATH.put(config, CliParser.BASE_PATH_DEFAULT); + PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, CliParser.BASE_INTERIM_OUTPUT_PATH_DEFAULT); + PcapOptions.FIELDS.put(config, query); + PcapOptions.NUM_REDUCERS.put(config, 10); + PcapOptions.START_TIME_MS.put(config, 500L); - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); + when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner); - PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); + PcapCli cli = new PcapCli(jobRunner, prefixStrategy); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap")); + verify(jobRunner).get(); + } + + /** + * Check that "map" entries exist in the tested map "item". Note, will not work for complex + * Objects where equals() does not compare contents favorably. e.g. Configurator() did not work. + */ + private <K, V> Matcher<Map<K, V>> mapContaining(Map<K, V> map) { + return new TypeSafeMatcher<Map<K, V>>() { + @Override + protected boolean matchesSafely(Map<K, V> item) { + return item.entrySet().containsAll(map.entrySet()); + } + + @Override + public void describeTo(Description description) { + description.appendText("Should contain items: "); + for (Entry<K, V> entry : map.entrySet()) { + StringBuilder sb = new StringBuilder(); + sb.append("key="); + sb.append(entry.getKey()); + sb.append(",value="); + sb.append(entry.getValue()); + description.appendText(sb.toString()); + } + } + }; } @Test @@ -129,13 +147,6 @@ public class PcapCliTest { "-num_reducers", "10", "-records_per_file", "1000" }; - List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); - Iterator iterator = pcaps.iterator(); - SequenceFileIterable iterable = mock(SequenceFileIterable.class); - when(iterable.iterator()).thenReturn(iterator); - - Path base_path = new Path("/base/path"); - Path base_output_path = new Path("/base/output/path"); Map<String, String> query = new HashMap<String, String>() {{ put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1"); put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2"); @@ -144,12 +155,20 @@ public class PcapCliTest { put(Constants.Fields.PROTOCOL.getName(), "6"); put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true"); }}; - - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); - - PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); + FixedPcapConfig config = new FixedPcapConfig(prefixStrategy); + PcapOptions.BASE_PATH.put(config, "/base/path"); + PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, "/base/output/path"); + PcapOptions.FIELDS.put(config, query); + PcapOptions.NUM_REDUCERS.put(config, 10); + PcapOptions.START_TIME_MS.put(config, 500L); + PcapOptions.END_TIME_MS.put(config, 1000L); + PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000); + + when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner); + + PcapCli cli = new PcapCli(jobRunner, prefixStrategy); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap")); + verify(jobRunner).get(); } @Test @@ -170,13 +189,6 @@ public class PcapCliTest { "-num_reducers", "10", "-records_per_file", "1000" }; - List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); - Iterator iterator = pcaps.iterator(); - SequenceFileIterable iterable = mock(SequenceFileIterable.class); - when(iterable.iterator()).thenReturn(iterator); - - Path base_path = new Path("/base/path"); - Path base_output_path = new Path("/base/output/path"); Map<String, String> query = new HashMap<String, String>() {{ put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1"); put(Constants.Fields.DST_ADDR.getName(), "192.168.1.2"); @@ -188,11 +200,23 @@ public class PcapCliTest { long startAsNanos = asNanos("2016-06-13-18:35.00", "yyyy-MM-dd-HH:mm.ss"); long endAsNanos = asNanos("2016-06-15-18:35.00", "yyyy-MM-dd-HH:mm.ss"); - when(jobRunner.query(eq(base_path), eq(base_output_path), eq(startAsNanos), eq(endAsNanos), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(FixedPcapFilter.Configurator.class))).thenReturn(iterable); - PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); + FixedPcapConfig config = new FixedPcapConfig(prefixStrategy); + PcapOptions.BASE_PATH.put(config, "/base/path"); + PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, "/base/output/path"); + PcapOptions.FIELDS.put(config, query); + PcapOptions.NUM_REDUCERS.put(config, 10); + PcapOptions.START_TIME_MS.put(config, startAsNanos / 1000000L); // needed bc defaults in config + PcapOptions.END_TIME_MS.put(config, endAsNanos / 1000000L); // needed bc defaults in config + PcapOptions.START_TIME_NS.put(config, startAsNanos); + PcapOptions.END_TIME_NS.put(config, endAsNanos); + PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000); + + when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner); + + PcapCli cli = new PcapCli(jobRunner, prefixStrategy); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap")); + verify(jobRunner).get(); } private long asNanos(String inDate, String format) throws ParseException { @@ -212,20 +236,20 @@ public class PcapCliTest { "-start_time", "500", "-query", "some query string" }; - List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); - Iterator iterator = pcaps.iterator(); - SequenceFileIterable iterable = mock(SequenceFileIterable.class); - when(iterable.iterator()).thenReturn(iterator); - Path base_path = new Path(CliParser.BASE_PATH_DEFAULT); - Path base_output_path = new Path(CliParser.BASE_OUTPUT_PATH_DEFAULT); String query = "some query string"; + FixedPcapConfig config = new FixedPcapConfig(prefixStrategy); + PcapOptions.BASE_PATH.put(config, CliParser.BASE_PATH_DEFAULT); + PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, CliParser.BASE_INTERIM_OUTPUT_PATH_DEFAULT); + PcapOptions.FIELDS.put(config, query); + PcapOptions.NUM_REDUCERS.put(config, 10); + PcapOptions.START_TIME_MS.put(config, 500L); - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable); + when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner); - PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); + PcapCli cli = new PcapCli(jobRunner, prefixStrategy); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap")); + verify(jobRunner).get(); } @Test @@ -240,20 +264,22 @@ public class PcapCliTest { "-query", "some query string", "-records_per_file", "1000" }; - List<byte[]> pcaps = Arrays.asList(new byte[][]{asBytes("abc"), asBytes("def"), asBytes("ghi")}); - Iterator iterator = pcaps.iterator(); - SequenceFileIterable iterable = mock(SequenceFileIterable.class); - when(iterable.iterator()).thenReturn(iterator); - Path base_path = new Path("/base/path"); - Path base_output_path = new Path("/base/output/path"); String query = "some query string"; - - when(jobRunner.query(eq(base_path), eq(base_output_path), anyLong(), anyLong(), anyInt(), eq(query), isA(Configuration.class), isA(FileSystem.class), isA(QueryPcapFilter.Configurator.class))).thenReturn(iterable); - - PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); + FixedPcapConfig config = new FixedPcapConfig(prefixStrategy); + PcapOptions.BASE_PATH.put(config, "/base/path"); + PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, "/base/output/path"); + PcapOptions.FIELDS.put(config, query); + PcapOptions.NUM_REDUCERS.put(config, 10); + PcapOptions.START_TIME_MS.put(config, 500L); // needed bc defaults in config + PcapOptions.END_TIME_MS.put(config, 1000L); // needed bc defaults in config + PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000); + + when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner); + + PcapCli cli = new PcapCli(jobRunner, prefixStrategy); assertThat("Expect no errors on run", cli.run(args), equalTo(0)); - Mockito.verify(resultsWriter).write(isA(Configuration.class), eq(pcaps), eq("file:" + execDir + "/pcap-data-random_prefix+0001.pcap")); + verify(jobRunner).get(); } // INVALID OPTION CHECKS @@ -290,7 +316,7 @@ public class PcapCliTest { PrintStream errOutStream = new PrintStream(new BufferedOutputStream(ebos)); System.setErr(errOutStream); - PcapCli cli = new PcapCli(jobRunner, resultsWriter, clock -> "random_prefix"); + PcapCli cli = new PcapCli(jobRunner, clock -> "random_prefix"); assertThat("Expect errors on run", cli.run(args), equalTo(-1)); assertThat("Expect missing required option error: " + ebos.toString(), ebos.toString().contains(optMsg), equalTo(true)); assertThat("Expect usage to be printed: " + bos.toString(), bos.toString().contains("usage: " + type + " filter options"), equalTo(true)); http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java deleted file mode 100644 index 997c5f7..0000000 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFiles.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.pcap; - -import java.util.List; -import org.apache.hadoop.fs.Path; -import org.apache.metron.job.Pageable; - -public class PcapFiles implements Pageable<Path> { - - private List<Path> files; - - public PcapFiles(List<Path> files) { - this.files = files; - } - - @Override - public Iterable<Path> asIterable() { - return files; - } - - @Override - public Path getPage(int num) { - return files.get(num); - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java new file mode 100644 index 0000000..c98e681 --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPages.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.metron.job.Pageable; + +public class PcapPages implements Pageable<Path> { + + private final List<Path> files; + + /** + * Copy constructor. + */ + public PcapPages(Pageable<Path> pages) { + this.files = new ArrayList<>(); + for (Path path : pages) { + files.add(new Path(path.toString())); + } + } + + /** + * Defaults with empty list. + */ + public PcapPages() { + this.files = new ArrayList<>(); + } + + public PcapPages(List<Path> paths) { + files = new ArrayList<>(paths); + } + + @Override + public Path getPage(int num) { + return files.get(num); + } + + @Override + public int getSize() { + return files.size(); + } + + @Override + public Iterator<Path> iterator() { + return new PcapIterator(files.iterator()); + } + + private class PcapIterator implements Iterator<Path> { + + private Iterator<Path> delegateIt; + + public PcapIterator(Iterator<Path> iterator) { + this.delegateIt = iterator; + } + + @Override + public boolean hasNext() { + return delegateIt.hasNext(); + } + + @Override + public Path next() { + return delegateIt.next(); + } + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/FixedPcapConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/FixedPcapConfig.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/FixedPcapConfig.java new file mode 100644 index 0000000..c40407b --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/FixedPcapConfig.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.pcap.config; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class FixedPcapConfig extends PcapConfig { + + public FixedPcapConfig(PrefixStrategy prefixStrategy) { + super(prefixStrategy); + setFixedFields(new LinkedHashMap<>()); + } + + public Map<String, String> getFixedFields() { + return PcapOptions.FIELDS.get(this, Map.class); + } + + public void setFixedFields(Map<String, String> fixedFields) { + PcapOptions.FIELDS.put(this, fixedFields); + } + + public void putFixedField(String key, String value) { + Map<String, String> fixedFields = PcapOptions.FIELDS.get(this, Map.class); + String trimmedVal = value != null ? value.trim() : null; + if (!isNullOrEmpty(trimmedVal)) { + fixedFields.put(key, value); + } + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java new file mode 100644 index 0000000..26509be --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.pcap.config; + +import org.apache.commons.collections4.map.AbstractMapDecorator; +import org.apache.commons.lang3.StringUtils; +import org.apache.metron.common.system.Clock; +import org.apache.metron.common.configuration.ConfigOption; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.HashMap; +import java.util.function.Function; + +public class PcapConfig extends AbstractMapDecorator<String, Object>{ + public interface PrefixStrategy extends Function<Clock, String>{} + + private boolean showHelp; + private DateFormat dateFormat; + + public PcapConfig() { + super(new HashMap<>()); + } + + public PcapConfig(PrefixStrategy prefixStrategy) { + this(); + setShowHelp(false); + setBasePath(""); + setBaseInterimResultPath(""); + setStartTimeMs(-1L); + setEndTimeMs(-1L); + setNumReducers(0); + setFinalFilenamePrefix(prefixStrategy.apply(new Clock())); + } + + public Object getOption(ConfigOption option) { + Object o = get(option.getKey()); + return option.transform().apply(option.getKey(), o); + } + + public String getFinalFilenamePrefix() { + return PcapOptions.FINAL_FILENAME_PREFIX.get(this, String.class); + } + + public void setFinalFilenamePrefix(String prefix) { + PcapOptions.FINAL_FILENAME_PREFIX.put(this, prefix); + } + + public int getNumReducers() { + return PcapOptions.NUM_REDUCERS.get(this, Integer.class); + } + + public boolean showHelp() { + return showHelp; + } + + public void setShowHelp(boolean showHelp) { + this.showHelp = showHelp; + } + + public String getBasePath() { + return PcapOptions.BASE_PATH.get(this, String.class); + } + + public String getBaseInterimResultPath() { + return PcapOptions.BASE_INTERIM_RESULT_PATH.get(this, String.class); + } + + public long getStartTimeMs() { + return PcapOptions.START_TIME_MS.get(this, Long.class); + } + + public long getEndTimeMs() { + return PcapOptions.END_TIME_MS.get(this, Long.class); + } + + public void setBasePath(String basePath) { + PcapOptions.BASE_PATH.put(this, basePath); + } + + public void setBaseInterimResultPath(String baseOutputPath) { + PcapOptions.BASE_INTERIM_RESULT_PATH.put(this, baseOutputPath); + } + + public void setStartTimeMs(long startTime) { + PcapOptions.START_TIME_MS.put(this, startTime); + } + + public void setEndTimeMs(long endTime) { + PcapOptions.END_TIME_MS.put(this, endTime); + } + + public boolean isNullOrEmpty(String val) { + return StringUtils.isEmpty(val); + } + + public void setDateFormat(String dateFormat) { + this.dateFormat = new SimpleDateFormat(dateFormat); + } + + public DateFormat getDateFormat() { + return dateFormat; + } + + public void setNumReducers(int numReducers) { + PcapOptions.NUM_REDUCERS.put(this, numReducers); + } + + public int getNumRecordsPerFile() { + return PcapOptions.NUM_RECORDS_PER_FILE.get(this, Integer.class); + } + + public void setNumRecordsPerFile(int numRecordsPerFile) { + PcapOptions.NUM_RECORDS_PER_FILE.put(this, numRecordsPerFile); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java new file mode 100644 index 0000000..09effd4 --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.config; + +import java.util.function.BiFunction; +import org.apache.hadoop.fs.Path; +import org.apache.metron.common.configuration.ConfigOption; + +public enum PcapOptions implements ConfigOption { + JOB_NAME("jobName"), + FINAL_FILENAME_PREFIX("finalFilenamePrefix"), + BASE_PATH("basePath", (s, o) -> o == null ? null : new Path(o.toString())), + INTERIM_RESULT_PATH("interimResultPath", (s, o) -> o == null ? null : new Path(o.toString())), + BASE_INTERIM_RESULT_PATH("baseInterimResultPath", (s, o) -> o == null ? null : new Path(o.toString())), + FINAL_OUTPUT_PATH("finalOutputPath", (s, o) -> o == null ? null : new Path(o.toString())), + NUM_REDUCERS("numReducers"), + START_TIME_MS("startTimeMs"), + END_TIME_MS("endTimeMs"), + START_TIME_NS("startTimeNs"), + END_TIME_NS("endTimeNs"), + NUM_RECORDS_PER_FILE("numRecordsPerFile"), + FIELDS("fields"), + FILTER_IMPL("filterImpl"), + HADOOP_CONF("hadoopConf"), + FILESYSTEM("fileSystem"); + + public static final BiFunction<String, Object, Path> STRING_TO_PATH = + (s, o) -> o == null ? null : new Path(o.toString()); + private String key; + private BiFunction<String, Object, Object> transform = (s, o) -> o; + + PcapOptions(String key) { + this.key = key; + } + + PcapOptions(String key, BiFunction<String, Object, Object> transform) { + this.key = key; + this.transform = transform; + } + + @Override + public String getKey() { + return key; + } + + @Override + public BiFunction<String, Object, Object> transform() { + return transform; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/QueryPcapConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/QueryPcapConfig.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/QueryPcapConfig.java new file mode 100644 index 0000000..ef32839 --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/QueryPcapConfig.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.pcap.config; + +public class QueryPcapConfig extends PcapConfig { + + public QueryPcapConfig(PrefixStrategy prefixStrategy) { + super(prefixStrategy); + } + + public String getQuery() { + return PcapOptions.FIELDS.get(this, String.class); + } + + public void setQuery(String query) { + PcapOptions.FIELDS.put(this, query); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java new file mode 100644 index 0000000..e032158 --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.finalizer; + +import java.util.Map; +import org.apache.hadoop.fs.Path; +import org.apache.metron.pcap.config.PcapOptions; + +/** + * Write to local FS. + */ +public class PcapCliFinalizer extends PcapFinalizer { + + /** + * Format will have the format <output-path>/pcap-data-<filename-prefix>+<partition-num>.pcap + * The filename prefix is pluggable, but in most cases it will be provided via the PcapConfig + * as a formatted timestamp + uuid. A final sample format will look as follows: + * /base/output/path/pcap-data-201807181911-09855b4ae3204dee8b63760d65198da3+0001.pcap + */ + private static final String PCAP_CLI_FILENAME_FORMAT = "%s/pcap-data-%s+%04d.pcap"; + + @Override + protected String getOutputFileName(Map<String, Object> config, int partition) { + Path finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, PcapOptions.STRING_TO_PATH, Path.class); + String prefix = PcapOptions.FINAL_FILENAME_PREFIX.get(config, String.class); + return String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, prefix, partition); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java new file mode 100644 index 0000000..d5ac675 --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.finalizer; + +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.metron.common.hadoop.SequenceFileIterable; +import org.apache.metron.job.Finalizer; +import org.apache.metron.job.JobException; +import org.apache.metron.job.Pageable; +import org.apache.metron.pcap.PcapPages; +import org.apache.metron.pcap.config.PcapOptions; +import org.apache.metron.pcap.writer.PcapResultsWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Takes Pcap results from a specified path - for PCAP, it is assumed that these results are SequenceFileIterables. + * The results are then processed by partitioning the results based on a num records per file option + * into a final output file with a PCAP header for each partition, and written to a final output location. + * The MapReduce results are cleaned up after successfully writing out the final results. + */ +public abstract class PcapFinalizer implements Finalizer<Path> { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private PcapResultsWriter resultsWriter; + + protected PcapFinalizer() { + this.resultsWriter = new PcapResultsWriter(); + } + + protected PcapResultsWriter getResultsWriter() { + return resultsWriter; + } + + @Override + public Pageable<Path> finalizeJob(Map<String, Object> config) throws JobException { + Configuration hadoopConfig = PcapOptions.HADOOP_CONF.get(config, Configuration.class); + int recPerFile = PcapOptions.NUM_RECORDS_PER_FILE.get(config, Integer.class); + Path interimResultPath = PcapOptions.INTERIM_RESULT_PATH + .get(config, PcapOptions.STRING_TO_PATH, Path.class); + FileSystem fs = PcapOptions.FILESYSTEM.get(config, FileSystem.class); + + SequenceFileIterable interimResults = null; + try { + interimResults = readInterimResults(interimResultPath, hadoopConfig, fs); + } catch (IOException e) { + throw new JobException("Unable to read interim job results while finalizing", e); + } + List<Path> outFiles = new ArrayList<>(); + try { + Iterable<List<byte[]>> partitions = Iterables.partition(interimResults, recPerFile); + int part = 1; + if (partitions.iterator().hasNext()) { + for (List<byte[]> data : partitions) { + String outFileName = getOutputFileName(config, part++); + if (data.size() > 0) { + getResultsWriter().write(hadoopConfig, data, outFileName); + outFiles.add(new Path(outFileName)); + } + } + } else { + LOG.info("No results returned."); + } + } catch (IOException e) { + throw new JobException("Failed to finalize results", e); + } finally { + try { + interimResults.cleanup(); + } catch (IOException e) { + LOG.warn("Unable to cleanup files in HDFS", e); + } + } + return new PcapPages(outFiles); + } + + protected abstract String getOutputFileName(Map<String, Object> config, int partition); + + /** + * Returns a lazily-read Iterable over a set of sequence files. + */ + protected SequenceFileIterable readInterimResults(Path interimResultPath, Configuration config, + FileSystem fs) throws IOException { + List<Path> files = new ArrayList<>(); + for (RemoteIterator<LocatedFileStatus> it = fs.listFiles(interimResultPath, false); + it.hasNext(); ) { + Path p = it.next().getPath(); + if (p.getName().equals("_SUCCESS")) { + fs.delete(p, false); + continue; + } + files.add(p); + } + if (files.size() == 0) { + LOG.info("No files to process with specified date range."); + } else { + LOG.debug("Interim results path={}", interimResultPath); + Collections.sort(files, (o1, o2) -> o1.getName().compareTo(o2.getName())); + } + return new SequenceFileIterable(files, config); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizerStrategies.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizerStrategies.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizerStrategies.java new file mode 100644 index 0000000..927d602 --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizerStrategies.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.finalizer; + +import java.util.Map; +import org.apache.hadoop.fs.Path; +import org.apache.metron.job.Finalizer; +import org.apache.metron.job.JobException; +import org.apache.metron.job.Pageable; + +/** + * PcapJob runs a MapReduce job that outputs Sequence Files to HDFS. This Strategy/Factory class + * provides options for doing final processing on this raw MapReduce output for the CLI and REST + * API's. + */ +public enum PcapFinalizerStrategies implements Finalizer<Path> { + CLI(new PcapCliFinalizer()), + REST(new PcapRestFinalizer()); + + private Finalizer<Path> finalizer; + + PcapFinalizerStrategies(Finalizer<Path> finalizer) { + this.finalizer = finalizer; + } + + @Override + public Pageable<Path> finalizeJob(Map<String, Object> config) throws JobException { + return finalizer.finalizeJob(config); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java new file mode 100644 index 0000000..059bba2 --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.finalizer; + +import java.util.Map; +import org.apache.hadoop.fs.Path; +import org.apache.metron.pcap.config.PcapOptions; + +/** + * Write to HDFS. + */ +public class PcapRestFinalizer extends PcapFinalizer { + + /** + * Format will have the format <output-path>/page-<page-num>.pcap + * The filename prefix is pluggable, but in most cases it will be provided via the PcapConfig + * as a formatted timestamp + uuid. A final sample format will look as follows: + * /base/output/path/pcap-data-201807181911-09855b4ae3204dee8b63760d65198da3+0001.pcap + */ + private static final String PCAP_CLI_FILENAME_FORMAT = "%s/page-%s.pcap"; + + @Override + protected String getOutputFileName(Map<String, Object> config, int partition) { + Path finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.getTransformed(config, Path.class); + return String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, partition); + } + +}