METRON-1731: PCAP - Escape colons in output dir names (mmiklavc via mmiklavc) closes apache/metron#1155
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/73dc63e6 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/73dc63e6 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/73dc63e6 Branch: refs/remotes/apache/feature/METRON-1699-create-batch-profiler Commit: 73dc63e671b55d22d251f4be1c217259f4f5dc71 Parents: 05316a4 Author: mmiklavc <michael.miklav...@gmail.com> Authored: Fri Aug 10 12:42:47 2018 -0600 Committer: Michael Miklavcic <michael.miklav...@gmail.com> Committed: Fri Aug 10 12:42:47 2018 -0600 ---------------------------------------------------------------------- .../apache/metron/pcap/FixedPcapFilterTest.java | 286 ------------------ .../org/apache/metron/pcap/PcapJobTest.java | 290 ------------------- .../apache/metron/pcap/QueryPcapFilterTest.java | 228 --------------- .../pcap/filter/fixed/FixedPcapFilter.java | 14 +- .../pcap/filter/query/QueryPcapFilter.java | 17 +- .../metron/pcap/mr/OutputDirFormatter.java | 37 +++ .../java/org/apache/metron/pcap/mr/PcapJob.java | 5 +- .../pcap/filter/fixed/FixedPcapFilterTest.java | 271 ++++++++++++++++- .../pcap/filter/query/QueryPcapFilterTest.java | 207 ++++++++++++- .../metron/pcap/mr/OutputDirFormatterTest.java | 62 ++++ .../org/apache/metron/pcap/mr/PcapJobTest.java | 290 +++++++++++++++++++ 11 files changed, 877 insertions(+), 830 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java deleted file mode 100644 index 84969d3..0000000 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.pcap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.metron.common.Constants; -import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -public class FixedPcapFilterTest { - @Test - public void testTrivialEquality() throws Exception { - Configuration config = new Configuration(); - final Map<String, String> fields = new HashMap<String, String>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), "0"); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), "1"); - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); - }}; - new FixedPcapFilter.Configurator().addToConfig(fields, config); - { - FixedPcapFilter filter = new FixedPcapFilter() { - @Override - protected Map<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - } - - @Test - public void testReverseTraffic() throws Exception { - Configuration config = new Configuration(); - final Map<String, String> fields = new HashMap<String, String>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), "0"); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), "1"); - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true"); - }}; - new FixedPcapFilter.Configurator().addToConfig(fields, config); - { - FixedPcapFilter filter = new FixedPcapFilter() { - @Override - protected Map<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - new FixedPcapFilter.Configurator().addToConfig(fields, config); - { - FixedPcapFilter filter = new FixedPcapFilter() { - @Override - protected Map<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "dst_ip"); - put(Constants.Fields.SRC_PORT.getName(), 1); - put(Constants.Fields.DST_ADDR.getName(), "src_ip"); - put(Constants.Fields.DST_PORT.getName(), 0); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - new FixedPcapFilter.Configurator().addToConfig(fields, config); - { - FixedPcapFilter filter = new FixedPcapFilter() { - @Override - protected Map<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "dst_ip"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "src_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertFalse(filter.test(null)); - } - } -@Test -public void testMissingDstAddr() throws Exception { - Configuration config = new Configuration(); - final HashMap<String, String> fields = new HashMap<String, String>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), "0"); - put(Constants.Fields.DST_PORT.getName(), "1"); - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); - }}; - new FixedPcapFilter.Configurator().addToConfig(fields, config); - { - FixedPcapFilter filter = new FixedPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - new FixedPcapFilter.Configurator().addToConfig(fields, config); - { - FixedPcapFilter filter = new FixedPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip1"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertFalse(filter.test(null)); - } -} - @Test - public void testMissingDstPort() throws Exception { - Configuration config = new Configuration(); - final HashMap<String, String> fields = new HashMap<String, String>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), "0"); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); - }}; - new FixedPcapFilter.Configurator().addToConfig(fields, config); - { - FixedPcapFilter filter = new FixedPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - new FixedPcapFilter.Configurator().addToConfig(fields, config); - { - FixedPcapFilter filter = new FixedPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 100); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - new FixedPcapFilter.Configurator().addToConfig(fields, config); - { - FixedPcapFilter filter = new FixedPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 100); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 100); - }}; - } - }; - filter.configure(config); - Assert.assertFalse(filter.test(null)); - } - } - @Test - public void testMissingSrcAddr() throws Exception { - Configuration config = new Configuration(); - final HashMap<String, String> fields = new HashMap<String, String>() {{ - put(Constants.Fields.SRC_PORT.getName(), "0"); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), "1"); - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); - }}; - new FixedPcapFilter.Configurator().addToConfig(fields, config); - { - FixedPcapFilter filter = new FixedPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - } - @Test - public void testMissingSrcPort() throws Exception { - Configuration config = new Configuration(); - final HashMap<String, String> fields = new HashMap<String, String>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), "1"); - put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); - }}; - new FixedPcapFilter.Configurator().addToConfig(fields, config); - { - FixedPcapFilter filter = new FixedPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - new FixedPcapFilter.Configurator().addToConfig(fields, config); - { - FixedPcapFilter filter = new FixedPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 100); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java deleted file mode 100644 index 796c8a5..0000000 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java +++ /dev/null @@ -1,290 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.pcap; - -import static java.lang.Long.toUnsignedString; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Timer; -import java.util.TimerTask; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.metron.common.utils.timestamp.TimestampConverters; -import org.apache.metron.job.Finalizer; -import org.apache.metron.job.JobStatus; -import org.apache.metron.job.JobStatus.State; -import org.apache.metron.job.Pageable; -import org.apache.metron.job.Statusable; -import org.apache.metron.pcap.config.FixedPcapConfig; -import org.apache.metron.pcap.config.PcapOptions; -import org.apache.metron.pcap.filter.PcapFilterConfigurator; -import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; -import org.apache.metron.pcap.mr.PcapJob; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -public class PcapJobTest { - - @Mock - private Job mrJob; - @Mock - private org.apache.hadoop.mapreduce.JobStatus mrStatus; - @Mock - private JobID jobId; - @Mock - private Finalizer<Path> finalizer; - private TestTimer timer; - private Pageable<Path> pageableResult; - private FixedPcapConfig config; - private Configuration hadoopConfig; - private FileSystem fileSystem; - private String jobIdVal = "job_abc_123"; - private Path basePath; - private Path baseOutPath; - private long startTime; - private long endTime; - private int numReducers; - private int numRecordsPerFile; - private Path finalOutputPath; - private Map<String, String> fixedFields; - private PcapJob<Map<String, String>> testJob; - - @Before - public void setup() throws IOException { - MockitoAnnotations.initMocks(this); - basePath = new Path("basepath"); - baseOutPath = new Path("outpath"); - startTime = 100; - endTime = 200; - numReducers = 5; - numRecordsPerFile = 5; - fixedFields = new HashMap<>(); - fixedFields.put("ip_src_addr", "192.168.1.1"); - hadoopConfig = new Configuration(); - fileSystem = FileSystem.get(hadoopConfig); - finalOutputPath = new Path("finaloutpath"); - when(jobId.toString()).thenReturn(jobIdVal); - when(mrStatus.getJobID()).thenReturn(jobId); - when(mrJob.getJobID()).thenReturn(jobId); - pageableResult = new PcapPages(); - timer = new TestTimer(); - // handles setting the file name prefix under the hood - config = new FixedPcapConfig(clock -> "clockprefix"); - PcapOptions.HADOOP_CONF.put(config, hadoopConfig); - PcapOptions.FILESYSTEM.put(config, FileSystem.get(hadoopConfig)); - PcapOptions.BASE_PATH.put(config, basePath); - PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, baseOutPath); - PcapOptions.START_TIME_NS.put(config, startTime); - PcapOptions.END_TIME_NS.put(config, endTime); - PcapOptions.NUM_REDUCERS.put(config, numReducers); - PcapOptions.FIELDS.put(config, fixedFields); - PcapOptions.FILTER_IMPL.put(config, new FixedPcapFilter.Configurator()); - PcapOptions.NUM_RECORDS_PER_FILE.put(config, numRecordsPerFile); - PcapOptions.FINAL_OUTPUT_PATH.put(config, finalOutputPath); - testJob = new TestJob<>(mrJob); - testJob.setStatusInterval(1); - testJob.setCompleteCheckInterval(1); - testJob.setTimer(timer); - } - - private class TestJob<T> extends PcapJob<T> { - - private final Job mrJob; - - public TestJob(Job mrJob) { - this.mrJob = mrJob; - } - - @Override - public Job createJob(Optional<String> jobName, - Path basePath, - Path outputPath, - long beginNS, - long endNS, - int numReducers, - T fields, - Configuration conf, - FileSystem fs, - PcapFilterConfigurator<T> filterImpl) throws IOException { - return mrJob; - } - } - - private class TestTimer extends Timer { - - private TimerTask task; - - @Override - public void scheduleAtFixedRate(TimerTask task, long delay, long period) { - this.task = task; - } - - public void updateJobStatus() { - task.run(); - } - - } - - @Test - public void partition_gives_value_in_range() throws Exception { - long start = 1473897600000000000L; - long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L); - Configuration conf = new Configuration(); - conf.set(PcapJob.START_TS_CONF, toUnsignedString(start)); - conf.set(PcapJob.END_TS_CONF, toUnsignedString(end)); - conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10)); - PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner(); - partitioner.setConf(conf); - Assert.assertThat("Partition not in range", - partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10), - equalTo(8)); - } - - @Test - public void job_succeeds_synchronously() throws Exception { - pageableResult = new PcapPages( - Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt"))); - when(finalizer.finalizeJob(any())).thenReturn(pageableResult); - when(mrJob.isComplete()).thenReturn(true); - when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); - when(mrJob.getStatus()).thenReturn(mrStatus); - Statusable<Path> statusable = testJob.submit(finalizer, config); - timer.updateJobStatus(); - Pageable<Path> results = statusable.get(); - Assert.assertThat(results.getSize(), equalTo(3)); - JobStatus status = statusable.getStatus(); - Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED)); - Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); - Assert.assertThat(status.getJobId(), equalTo(jobIdVal)); - } - - @Test - public void job_fails_synchronously() throws Exception { - when(mrJob.isComplete()).thenReturn(true); - when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.FAILED); - when(mrJob.getStatus()).thenReturn(mrStatus); - Statusable<Path> statusable = testJob.submit(finalizer, config); - timer.updateJobStatus(); - Pageable<Path> results = statusable.get(); - JobStatus status = statusable.getStatus(); - Assert.assertThat(status.getState(), equalTo(State.FAILED)); - Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); - Assert.assertThat(results.getSize(), equalTo(0)); - } - - @Test - public void job_fails_with_killed_status_synchronously() throws Exception { - when(mrJob.isComplete()).thenReturn(true); - when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); - when(mrJob.getStatus()).thenReturn(mrStatus); - Statusable<Path> statusable = testJob.submit(finalizer, config); - timer.updateJobStatus(); - Pageable<Path> results = statusable.get(); - JobStatus status = statusable.getStatus(); - Assert.assertThat(status.getState(), equalTo(State.KILLED)); - Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); - Assert.assertThat(results.getSize(), equalTo(0)); - } - - @Test - public void job_succeeds_asynchronously() throws Exception { - when(mrJob.isComplete()).thenReturn(true); - when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); - when(mrJob.getStatus()).thenReturn(mrStatus); - Statusable<Path> statusable = testJob.submit(finalizer, config); - timer.updateJobStatus(); - JobStatus status = statusable.getStatus(); - Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED)); - Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); - } - - @Test - public void job_reports_percent_complete() throws Exception { - when(mrJob.isComplete()).thenReturn(false); - when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING); - when(mrJob.getStatus()).thenReturn(mrStatus); - when(mrJob.mapProgress()).thenReturn(0.5f); - when(mrJob.reduceProgress()).thenReturn(0f); - Statusable<Path> statusable = testJob.submit(finalizer, config); - timer.updateJobStatus(); - JobStatus status = statusable.getStatus(); - Assert.assertThat(status.getState(), equalTo(State.RUNNING)); - Assert.assertThat(status.getDescription(), equalTo("map: 50.0%, reduce: 0.0%")); - Assert.assertThat(status.getPercentComplete(), equalTo(25.0)); - when(mrJob.mapProgress()).thenReturn(1.0f); - when(mrJob.reduceProgress()).thenReturn(0.5f); - timer.updateJobStatus(); - status = statusable.getStatus(); - Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 50.0%")); - Assert.assertThat(status.getPercentComplete(), equalTo(75.0)); - } - - @Test - public void killing_job_causes_status_to_return_KILLED_state() throws Exception { - when(mrJob.isComplete()).thenReturn(false); - when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING); - when(mrJob.getStatus()).thenReturn(mrStatus); - Statusable<Path> statusable = testJob.submit(finalizer, config); - statusable.kill(); - when(mrJob.isComplete()).thenReturn(true); - when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); - timer.updateJobStatus(); - JobStatus status = statusable.getStatus(); - Assert.assertThat(status.getState(), equalTo(State.KILLED)); - } - - @Test - public void handles_null_values_with_defaults() throws Exception { - PcapOptions.START_TIME_NS.put(config, null); - PcapOptions.END_TIME_NS.put(config, null); - PcapOptions.NUM_REDUCERS.put(config, null); - PcapOptions.NUM_RECORDS_PER_FILE.put(config, null); - - pageableResult = new PcapPages( - Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt"))); - when(finalizer.finalizeJob(any())).thenReturn(pageableResult); - when(mrJob.isComplete()).thenReturn(true); - when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); - when(mrJob.getStatus()).thenReturn(mrStatus); - Statusable<Path> statusable = testJob.submit(finalizer, config); - timer.updateJobStatus(); - Pageable<Path> results = statusable.get(); - Assert.assertThat(results.getSize(), equalTo(3)); - JobStatus status = statusable.getStatus(); - Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED)); - Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); - Assert.assertThat(status.getJobId(), equalTo(jobIdVal)); - } - -} http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java deleted file mode 100644 index 7e3d55c..0000000 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java +++ /dev/null @@ -1,228 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.pcap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.metron.common.Constants; -import org.apache.metron.pcap.filter.PcapFilter; -import org.apache.metron.pcap.filter.query.QueryPcapFilter; -import org.junit.Assert; -import org.junit.Test; - -import java.util.EnumMap; -import java.util.HashMap; - -public class QueryPcapFilterTest { - - @Test - public void testEmptyQueryFilter() throws Exception { - Configuration config = new Configuration(); - String query = ""; - new QueryPcapFilter.Configurator().addToConfig(query, config); - { - PcapFilter filter = new QueryPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - } - - @Test - public void testTrivialEquality() throws Exception { - Configuration config = new Configuration(); - String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and ip_dst_addr == 'dst_ip' and ip_dst_port == 1"; - new QueryPcapFilter.Configurator().addToConfig(query, config); - { - PcapFilter filter = new QueryPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - } - - @Test - public void testMissingDstAddr() throws Exception { - Configuration config = new Configuration(); - String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and ip_dst_port == 1"; - new QueryPcapFilter.Configurator().addToConfig(query, config); - { - QueryPcapFilter filter = new QueryPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - new QueryPcapFilter.Configurator().addToConfig(query, config); - { - QueryPcapFilter filter = new QueryPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip_no_match"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertFalse(filter.test(null)); - } - } - - @Test - public void testMissingDstPort() throws Exception { - Configuration config = new Configuration(); - String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and ip_dst_addr == 'dst_ip'"; - new QueryPcapFilter.Configurator().addToConfig(query, config); - { - QueryPcapFilter filter = new QueryPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - new QueryPcapFilter.Configurator().addToConfig(query, config); - { - QueryPcapFilter filter = new QueryPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 100); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - new QueryPcapFilter.Configurator().addToConfig(query, config); - { - QueryPcapFilter filter = new QueryPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 100); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 100); - }}; - } - }; - filter.configure(config); - Assert.assertFalse(filter.test(null)); - } - } - - @Test - public void testMissingSrcAddr() throws Exception { - Configuration config = new Configuration(); - String query = "ip_src_port == 0 and ip_dst_addr == 'dst_ip' and ip_dst_port == 1"; - new QueryPcapFilter.Configurator().addToConfig(query, config); - { - QueryPcapFilter filter = new QueryPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - } - - @Test - public void testMissingSrcPort() throws Exception { - Configuration config = new Configuration(); - String query = "ip_src_addr == 'src_ip' and ip_dst_addr == 'dst_ip' and ip_dst_port == 1"; - new QueryPcapFilter.Configurator().addToConfig(query, config); - { - QueryPcapFilter filter = new QueryPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 0); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - new QueryPcapFilter.Configurator().addToConfig(query, config); - { - QueryPcapFilter filter = new QueryPcapFilter() { - @Override - protected HashMap<String, Object> packetToFields(PacketInfo pi) { - return new HashMap<String, Object>() {{ - put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); - put(Constants.Fields.SRC_PORT.getName(), 100); - put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); - put(Constants.Fields.DST_PORT.getName(), 1); - }}; - } - }; - filter.configure(config); - Assert.assertTrue(filter.test(null)); - } - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java index 1954f1a..314bd85 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java @@ -19,20 +19,19 @@ package org.apache.metron.pcap.filter.fixed; import com.google.common.base.Joiner; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import javax.xml.bind.DatatypeConverter; import org.apache.hadoop.conf.Configuration; import org.apache.metron.common.Constants; -import org.apache.metron.stellar.dsl.MapVariableResolver; -import org.apache.metron.stellar.dsl.VariableResolver; import org.apache.metron.pcap.PacketInfo; import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.filter.PcapFilter; import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.filter.PcapFilters; import org.apache.metron.pcap.pattern.ByteArrayMatchingUtil; - -import javax.xml.bind.DatatypeConverter; -import java.util.Map; -import java.util.concurrent.ExecutionException; +import org.apache.metron.stellar.dsl.MapVariableResolver; +import org.apache.metron.stellar.dsl.VariableResolver; public class FixedPcapFilter implements PcapFilter { @@ -48,7 +47,8 @@ public class FixedPcapFilter implements PcapFilter { @Override public String queryToString(Map<String, String> fields) { - return (fields == null ? "" : Joiner.on("_").join(fields.values())); + return (fields == null ? "" : Joiner.on("_").join(fields.values()).replaceAll("\\s", "_") + ); } } http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java index 552a5ae..e7fff16 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java @@ -18,19 +18,18 @@ package org.apache.metron.pcap.filter.query; +import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.apache.metron.stellar.dsl.Context; -import org.apache.metron.stellar.dsl.MapVariableResolver; -import org.apache.metron.stellar.dsl.StellarFunctions; -import org.apache.metron.stellar.common.StellarPredicateProcessor; -import org.apache.metron.stellar.dsl.VariableResolver; import org.apache.metron.pcap.PacketInfo; import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.filter.PcapFilter; import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.filter.PcapFilters; - -import java.util.Map; +import org.apache.metron.stellar.common.StellarPredicateProcessor; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.MapVariableResolver; +import org.apache.metron.stellar.dsl.StellarFunctions; +import org.apache.metron.stellar.dsl.VariableResolver; public class QueryPcapFilter implements PcapFilter { public static final String QUERY_STR_CONFIG = "mql"; @@ -45,9 +44,7 @@ public class QueryPcapFilter implements PcapFilter { @Override public String queryToString(String fields) { return (fields == null ? "" : - fields.trim().replaceAll("\\s", "_") - .replace(".", "-") - .replace("'", "") + fields.trim().replaceAll("\\s", "_") ); } } http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/OutputDirFormatter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/OutputDirFormatter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/OutputDirFormatter.java new file mode 100644 index 0000000..0d464d5 --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/OutputDirFormatter.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.mr; + +import com.google.common.base.Joiner; +import java.util.UUID; + +public class OutputDirFormatter { + + public String format(long beginNS, long endNS, String query) { + return sanitize(Joiner.on("_").join(beginNS, endNS, query, UUID.randomUUID().toString())); + } + + private String sanitize(String path) { + return path + .replace(".", "-") + .replace("'", "") + .replace(":", ""); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index 10f31b4..0f5ad4d 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -35,7 +35,6 @@ import java.util.Map; import java.util.Optional; import java.util.Timer; import java.util.TimerTask; -import java.util.UUID; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configurable; @@ -83,6 +82,7 @@ public class PcapJob<T> implements Statusable<Path> { public static final String WIDTH_CONF = "width"; private static final long THREE_SECONDS = 3000; private static final long ONE_SECOND = 1000; + private final OutputDirFormatter outputDirFormatter; private volatile Job mrJob; // store a running MR job reference for async status check private volatile JobStatus jobStatus; // overall job status, including finalization step private Finalizer<Path> finalizer; @@ -187,6 +187,7 @@ public class PcapJob<T> implements Statusable<Path> { public PcapJob() { jobStatus = new JobStatus(); finalResults = new PcapPages(); + outputDirFormatter = new OutputDirFormatter(); timer = new Timer(); statusInterval = THREE_SECONDS; completeCheckInterval = ONE_SECOND; @@ -271,7 +272,7 @@ public class PcapJob<T> implements Statusable<Path> { FileSystem fs, PcapFilterConfigurator<T> filterImpl) throws IOException, ClassNotFoundException, InterruptedException { - String outputDirName = Joiner.on("_").join(beginNS, endNS, filterImpl.queryToString(fields), UUID.randomUUID().toString()); + String outputDirName = outputDirFormatter.format(beginNS, endNS, filterImpl.queryToString(fields)); if(LOG.isDebugEnabled()) { DateFormat format = SimpleDateFormat.getDateTimeInstance(SimpleDateFormat.LONG , SimpleDateFormat.LONG http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java index af2afd3..b32f23f 100644 --- a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java +++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java @@ -18,14 +18,17 @@ package org.apache.metron.pcap.filter.fixed; +import static org.hamcrest.CoreMatchers.equalTo; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.metron.common.Constants; +import org.apache.metron.pcap.PacketInfo; import org.junit.Assert; import org.junit.Test; -import java.util.LinkedHashMap; - -import static org.hamcrest.CoreMatchers.equalTo; - public class FixedPcapFilterTest { @Test @@ -66,4 +69,264 @@ public class FixedPcapFilterTest { } } + @Test + public void testTrivialEquality() throws Exception { + Configuration config = new Configuration(); + final Map<String, String> fields = new HashMap<String, String>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), "0"); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), "1"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); + }}; + new FixedPcapFilter.Configurator().addToConfig(fields, config); + { + FixedPcapFilter filter = new FixedPcapFilter() { + @Override + protected Map<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + } + + @Test + public void testReverseTraffic() throws Exception { + Configuration config = new Configuration(); + final Map<String, String> fields = new HashMap<String, String>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), "0"); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), "1"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true"); + }}; + new FixedPcapFilter.Configurator().addToConfig(fields, config); + { + FixedPcapFilter filter = new FixedPcapFilter() { + @Override + protected Map<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + new FixedPcapFilter.Configurator().addToConfig(fields, config); + { + FixedPcapFilter filter = new FixedPcapFilter() { + @Override + protected Map<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "dst_ip"); + put(Constants.Fields.SRC_PORT.getName(), 1); + put(Constants.Fields.DST_ADDR.getName(), "src_ip"); + put(Constants.Fields.DST_PORT.getName(), 0); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + new FixedPcapFilter.Configurator().addToConfig(fields, config); + { + FixedPcapFilter filter = new FixedPcapFilter() { + @Override + protected Map<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "dst_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "src_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertFalse(filter.test(null)); + } + } + + @Test + public void testMissingDstAddr() throws Exception { + Configuration config = new Configuration(); + final HashMap<String, String> fields = new HashMap<String, String>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), "0"); + put(Constants.Fields.DST_PORT.getName(), "1"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); + }}; + new FixedPcapFilter.Configurator().addToConfig(fields, config); + { + FixedPcapFilter filter = new FixedPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + new FixedPcapFilter.Configurator().addToConfig(fields, config); + { + FixedPcapFilter filter = new FixedPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip1"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertFalse(filter.test(null)); + } + } + + @Test + public void testMissingDstPort() throws Exception { + Configuration config = new Configuration(); + final HashMap<String, String> fields = new HashMap<String, String>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), "0"); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); + }}; + new FixedPcapFilter.Configurator().addToConfig(fields, config); + { + FixedPcapFilter filter = new FixedPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + new FixedPcapFilter.Configurator().addToConfig(fields, config); + { + FixedPcapFilter filter = new FixedPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 100); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + new FixedPcapFilter.Configurator().addToConfig(fields, config); + { + FixedPcapFilter filter = new FixedPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 100); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 100); + }}; + } + }; + filter.configure(config); + Assert.assertFalse(filter.test(null)); + } + } + + @Test + public void testMissingSrcAddr() throws Exception { + Configuration config = new Configuration(); + final HashMap<String, String> fields = new HashMap<String, String>() {{ + put(Constants.Fields.SRC_PORT.getName(), "0"); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), "1"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); + }}; + new FixedPcapFilter.Configurator().addToConfig(fields, config); + { + FixedPcapFilter filter = new FixedPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + } + + @Test + public void testMissingSrcPort() throws Exception { + Configuration config = new Configuration(); + final HashMap<String, String> fields = new HashMap<String, String>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), "1"); + put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false"); + }}; + new FixedPcapFilter.Configurator().addToConfig(fields, config); + { + FixedPcapFilter filter = new FixedPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + new FixedPcapFilter.Configurator().addToConfig(fields, config); + { + FixedPcapFilter filter = new FixedPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 100); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java index 061066e..2724e06 100644 --- a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java +++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java @@ -18,18 +18,23 @@ package org.apache.metron.pcap.filter.query; +import static org.hamcrest.CoreMatchers.equalTo; + +import java.util.HashMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.metron.common.Constants; +import org.apache.metron.pcap.PacketInfo; +import org.apache.metron.pcap.filter.PcapFilter; import org.junit.Assert; import org.junit.Test; -import static org.hamcrest.CoreMatchers.equalTo; - public class QueryPcapFilterTest { @Test public void string_representation_of_query_gets_formatted() throws Exception { String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'"; String actual = new QueryPcapFilter.Configurator().queryToString(query); - String expected = "ip_src_addr_==_srcIp_and_ip_src_port_==_80_and_ip_dst_addr_==_dstIp_and_ip_dst_port_==_100_and_protocol_==_protocol"; + String expected = "ip_src_addr_==_'srcIp'_and_ip_src_port_==_'80'_and_ip_dst_addr_==_'dstIp'_and_ip_dst_port_==_'100'_and_protocol_==_'protocol'"; Assert.assertThat("string representation did not match", actual, equalTo(expected)); } @@ -55,4 +60,200 @@ public class QueryPcapFilterTest { } } + @Test + public void testEmptyQueryFilter() throws Exception { + Configuration config = new Configuration(); + String query = ""; + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + PcapFilter filter = new QueryPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + } + + @Test + public void testTrivialEquality() throws Exception { + Configuration config = new Configuration(); + String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and ip_dst_addr == 'dst_ip' and ip_dst_port == 1"; + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + PcapFilter filter = new QueryPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + } + + @Test + public void testMissingDstAddr() throws Exception { + Configuration config = new Configuration(); + String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and ip_dst_port == 1"; + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip_no_match"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertFalse(filter.test(null)); + } + } + + @Test + public void testMissingDstPort() throws Exception { + Configuration config = new Configuration(); + String query = "ip_src_addr == 'src_ip' and ip_src_port == 0 and ip_dst_addr == 'dst_ip'"; + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 100); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 100); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 100); + }}; + } + }; + filter.configure(config); + Assert.assertFalse(filter.test(null)); + } + } + + @Test + public void testMissingSrcAddr() throws Exception { + Configuration config = new Configuration(); + String query = "ip_src_port == 0 and ip_dst_addr == 'dst_ip' and ip_dst_port == 1"; + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + } + + @Test + public void testMissingSrcPort() throws Exception { + Configuration config = new Configuration(); + String query = "ip_src_addr == 'src_ip' and ip_dst_addr == 'dst_ip' and ip_dst_port == 1"; + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 0); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + new QueryPcapFilter.Configurator().addToConfig(query, config); + { + QueryPcapFilter filter = new QueryPcapFilter() { + @Override + protected HashMap<String, Object> packetToFields(PacketInfo pi) { + return new HashMap<String, Object>() {{ + put(Constants.Fields.SRC_ADDR.getName(), "src_ip"); + put(Constants.Fields.SRC_PORT.getName(), 100); + put(Constants.Fields.DST_ADDR.getName(), "dst_ip"); + put(Constants.Fields.DST_PORT.getName(), 1); + }}; + } + }; + filter.configure(config); + Assert.assertTrue(filter.test(null)); + } + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/OutputDirFormatterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/OutputDirFormatterTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/OutputDirFormatterTest.java new file mode 100644 index 0000000..ae1cda4 --- /dev/null +++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/OutputDirFormatterTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.mr; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertThat; + +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.fs.Path; +import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; +import org.apache.metron.pcap.filter.query.QueryPcapFilter; +import org.junit.Test; + +public class OutputDirFormatterTest { + + @Test + public void formats_directory_name_for_query_filter_types() throws Exception { + long beginNS = TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()); + long endNS = TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()); + String query = "ip_dst_addr == '207.28.210.1' and protocol == 'PROTOCOL: ICMP(1)"; + String queryFilterString = new QueryPcapFilter.Configurator().queryToString(query); + OutputDirFormatter formatter = new OutputDirFormatter(); + String actual = formatter.format(beginNS, endNS, queryFilterString); + assertThat("Formatted directory names did not match.", actual, containsString("_ip_dst_addr_==_207-28-210-1_and_protocol_==_PROTOCOL_ICMP(1)_")); + // no URI exception should be thrown with dir name + new Path(actual); + } + + @Test + public void formats_directory_name_for_fixed_filter_types() throws Exception { + long beginNS = TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()); + long endNS = TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()); + Map<String, String> fields = new HashMap<>(); + fields.put("ip_src_address", "207.28.210.1"); + fields.put("protocol", "PROTOCOL: ICMP(1)"); + String fixedFilterString = new FixedPcapFilter.Configurator().queryToString(fields); + OutputDirFormatter formatter = new OutputDirFormatter(); + String actual = formatter.format(beginNS, endNS, fixedFilterString); + assertThat("Formatted directory names did not match.", actual, containsString("PROTOCOL_ICMP(1)_207-28-210-1")); + // no URI exception should be thrown with dir name + new Path(actual); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/73dc63e6/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java new file mode 100644 index 0000000..0f555d0 --- /dev/null +++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/PcapJobTest.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.mr; + +import static java.lang.Long.toUnsignedString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Timer; +import java.util.TimerTask; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.apache.metron.job.Finalizer; +import org.apache.metron.job.JobStatus; +import org.apache.metron.job.JobStatus.State; +import org.apache.metron.job.Pageable; +import org.apache.metron.job.Statusable; +import org.apache.metron.pcap.PcapPages; +import org.apache.metron.pcap.config.FixedPcapConfig; +import org.apache.metron.pcap.config.PcapOptions; +import org.apache.metron.pcap.filter.PcapFilterConfigurator; +import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class PcapJobTest { + + @Mock + private Job mrJob; + @Mock + private org.apache.hadoop.mapreduce.JobStatus mrStatus; + @Mock + private JobID jobId; + @Mock + private Finalizer<Path> finalizer; + private TestTimer timer; + private Pageable<Path> pageableResult; + private FixedPcapConfig config; + private Configuration hadoopConfig; + private FileSystem fileSystem; + private String jobIdVal = "job_abc_123"; + private Path basePath; + private Path baseOutPath; + private long startTime; + private long endTime; + private int numReducers; + private int numRecordsPerFile; + private Path finalOutputPath; + private Map<String, String> fixedFields; + private PcapJob<Map<String, String>> testJob; + + @Before + public void setup() throws IOException { + MockitoAnnotations.initMocks(this); + basePath = new Path("basepath"); + baseOutPath = new Path("outpath"); + startTime = 100; + endTime = 200; + numReducers = 5; + numRecordsPerFile = 5; + fixedFields = new HashMap<>(); + fixedFields.put("ip_src_addr", "192.168.1.1"); + hadoopConfig = new Configuration(); + fileSystem = FileSystem.get(hadoopConfig); + finalOutputPath = new Path("finaloutpath"); + when(jobId.toString()).thenReturn(jobIdVal); + when(mrStatus.getJobID()).thenReturn(jobId); + when(mrJob.getJobID()).thenReturn(jobId); + pageableResult = new PcapPages(); + timer = new TestTimer(); + // handles setting the file name prefix under the hood + config = new FixedPcapConfig(clock -> "clockprefix"); + PcapOptions.HADOOP_CONF.put(config, hadoopConfig); + PcapOptions.FILESYSTEM.put(config, FileSystem.get(hadoopConfig)); + PcapOptions.BASE_PATH.put(config, basePath); + PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, baseOutPath); + PcapOptions.START_TIME_NS.put(config, startTime); + PcapOptions.END_TIME_NS.put(config, endTime); + PcapOptions.NUM_REDUCERS.put(config, numReducers); + PcapOptions.FIELDS.put(config, fixedFields); + PcapOptions.FILTER_IMPL.put(config, new FixedPcapFilter.Configurator()); + PcapOptions.NUM_RECORDS_PER_FILE.put(config, numRecordsPerFile); + PcapOptions.FINAL_OUTPUT_PATH.put(config, finalOutputPath); + testJob = new TestJob<>(mrJob); + testJob.setStatusInterval(1); + testJob.setCompleteCheckInterval(1); + testJob.setTimer(timer); + } + + private class TestJob<T> extends PcapJob<T> { + + private final Job mrJob; + + public TestJob(Job mrJob) { + this.mrJob = mrJob; + } + + @Override + public Job createJob(Optional<String> jobName, + Path basePath, + Path outputPath, + long beginNS, + long endNS, + int numReducers, + T fields, + Configuration conf, + FileSystem fs, + PcapFilterConfigurator<T> filterImpl) throws IOException { + return mrJob; + } + } + + private class TestTimer extends Timer { + + private TimerTask task; + + @Override + public void scheduleAtFixedRate(TimerTask task, long delay, long period) { + this.task = task; + } + + public void updateJobStatus() { + task.run(); + } + + } + + @Test + public void partition_gives_value_in_range() throws Exception { + long start = 1473897600000000000L; + long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L); + Configuration conf = new Configuration(); + conf.set(PcapJob.START_TS_CONF, toUnsignedString(start)); + conf.set(PcapJob.END_TS_CONF, toUnsignedString(end)); + conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10)); + PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner(); + partitioner.setConf(conf); + Assert.assertThat("Partition not in range", + partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10), + equalTo(8)); + } + + @Test + public void job_succeeds_synchronously() throws Exception { + pageableResult = new PcapPages( + Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt"))); + when(finalizer.finalizeJob(any())).thenReturn(pageableResult); + when(mrJob.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); + when(mrJob.getStatus()).thenReturn(mrStatus); + Statusable<Path> statusable = testJob.submit(finalizer, config); + timer.updateJobStatus(); + Pageable<Path> results = statusable.get(); + Assert.assertThat(results.getSize(), equalTo(3)); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED)); + Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); + Assert.assertThat(status.getJobId(), equalTo(jobIdVal)); + } + + @Test + public void job_fails_synchronously() throws Exception { + when(mrJob.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.FAILED); + when(mrJob.getStatus()).thenReturn(mrStatus); + Statusable<Path> statusable = testJob.submit(finalizer, config); + timer.updateJobStatus(); + Pageable<Path> results = statusable.get(); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(State.FAILED)); + Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); + Assert.assertThat(results.getSize(), equalTo(0)); + } + + @Test + public void job_fails_with_killed_status_synchronously() throws Exception { + when(mrJob.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); + when(mrJob.getStatus()).thenReturn(mrStatus); + Statusable<Path> statusable = testJob.submit(finalizer, config); + timer.updateJobStatus(); + Pageable<Path> results = statusable.get(); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(State.KILLED)); + Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); + Assert.assertThat(results.getSize(), equalTo(0)); + } + + @Test + public void job_succeeds_asynchronously() throws Exception { + when(mrJob.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); + when(mrJob.getStatus()).thenReturn(mrStatus); + Statusable<Path> statusable = testJob.submit(finalizer, config); + timer.updateJobStatus(); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED)); + Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); + } + + @Test + public void job_reports_percent_complete() throws Exception { + when(mrJob.isComplete()).thenReturn(false); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING); + when(mrJob.getStatus()).thenReturn(mrStatus); + when(mrJob.mapProgress()).thenReturn(0.5f); + when(mrJob.reduceProgress()).thenReturn(0f); + Statusable<Path> statusable = testJob.submit(finalizer, config); + timer.updateJobStatus(); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(State.RUNNING)); + Assert.assertThat(status.getDescription(), equalTo("map: 50.0%, reduce: 0.0%")); + Assert.assertThat(status.getPercentComplete(), equalTo(25.0)); + when(mrJob.mapProgress()).thenReturn(1.0f); + when(mrJob.reduceProgress()).thenReturn(0.5f); + timer.updateJobStatus(); + status = statusable.getStatus(); + Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 50.0%")); + Assert.assertThat(status.getPercentComplete(), equalTo(75.0)); + } + + @Test + public void killing_job_causes_status_to_return_KILLED_state() throws Exception { + when(mrJob.isComplete()).thenReturn(false); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING); + when(mrJob.getStatus()).thenReturn(mrStatus); + Statusable<Path> statusable = testJob.submit(finalizer, config); + statusable.kill(); + when(mrJob.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); + timer.updateJobStatus(); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(State.KILLED)); + } + + @Test + public void handles_null_values_with_defaults() throws Exception { + PcapOptions.START_TIME_NS.put(config, null); + PcapOptions.END_TIME_NS.put(config, null); + PcapOptions.NUM_REDUCERS.put(config, null); + PcapOptions.NUM_RECORDS_PER_FILE.put(config, null); + + pageableResult = new PcapPages( + Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt"))); + when(finalizer.finalizeJob(any())).thenReturn(pageableResult); + when(mrJob.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); + when(mrJob.getStatus()).thenReturn(mrStatus); + Statusable<Path> statusable = testJob.submit(finalizer, config); + timer.updateJobStatus(); + Pageable<Path> results = statusable.get(); + Assert.assertThat(results.getSize(), equalTo(3)); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED)); + Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); + Assert.assertThat(status.getJobId(), equalTo(jobIdVal)); + } + +}