METRON-958: PCAP Query job throws exception when no files returned by time range query closes apache/metron#593
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/e2197316 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/e2197316 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/e2197316 Branch: refs/heads/Metron_0.4.0 Commit: e2197316df5754aba601134ecce19b12092b6f37 Parents: 356881a Author: mmiklavc <michael.miklav...@gmail.com> Authored: Sun May 21 21:08:45 2017 -0400 Committer: cstella <ceste...@gmail.com> Committed: Sun May 21 21:08:45 2017 -0400 ---------------------------------------------------------------------- .../org/apache/metron/pcap/PcapJobTest.java | 110 +------------- .../PcapTopologyIntegrationTest.java | 123 +++++++-------- .../apache/metron/pcap/PcapFilenameHelper.java | 77 ++++++++++ .../java/org/apache/metron/pcap/PcapHelper.java | 35 ++--- .../java/org/apache/metron/pcap/mr/PcapJob.java | 105 +++++-------- .../metron/pcap/utils/FileFilterUtil.java | 138 +++++++++++++++++ .../metron/pcap/PcapFilenameHelperTest.java | 75 ++++++++++ .../metron/pcap/mr/FileFilterUtilTest.java | 150 +++++++++++++++++++ 8 files changed, 568 insertions(+), 245 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java index 81725d8..3536a7e 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,10 +18,10 @@ package org.apache.metron.pcap; -import com.google.common.collect.Iterables; +import static java.lang.Long.toUnsignedString; +import static org.hamcrest.CoreMatchers.equalTo; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.metron.common.utils.timestamp.TimestampConverters; @@ -29,106 +29,9 @@ import org.apache.metron.pcap.mr.PcapJob; import org.junit.Assert; import org.junit.Test; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import static java.lang.Long.toUnsignedString; -import static org.hamcrest.CoreMatchers.equalTo; - public class PcapJobTest { @Test - public void test_getPaths_NoFiles() throws Exception { - PcapJob job; - { - final List<Path> inputFiles = new ArrayList<Path>() {{ - }}; - job = new PcapJob() { - @Override - protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException { - return inputFiles; - } - }; - Iterable<String> paths = job.getPaths(null, null, 0, 1000); - Assert.assertTrue(Iterables.isEmpty(paths)); - } - } - - @Test - public void test_getPaths_leftEdge() throws Exception { - PcapJob job; - { - final List<Path> inputFiles = new ArrayList<Path>() {{ - add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); - add(new Path("/apps/metron/pcap/pcap_pcap_1561589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); - }}; - job = new PcapJob() { - @Override - protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException { - return inputFiles; - } - }; - Iterable<String> paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis())); - Assert.assertEquals(1, Iterables.size(paths)); - } - } - - @Test - public void test_getPaths_rightEdge() throws Exception { - PcapJob job; - { - final List<Path> inputFiles = new ArrayList<Path>() {{ - add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); - add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); - }}; - job = new PcapJob() { - @Override - protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException { - return inputFiles; - } - }; - Iterable<String> paths = job.getPaths(null, null, 1461589333993573000L - 1L, 1461589333993573000L + 1L); - Assert.assertEquals(2, Iterables.size(paths)); - } - { - final List<Path> inputFiles = new ArrayList<Path>() {{ - add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); - add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); - add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); - }}; - job = new PcapJob() { - @Override - protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException { - return inputFiles; - } - }; - Iterable<String> paths = job.getPaths(null, null, 1461589334993573000L - 1L, 1461589334993573000L + 1L); - Assert.assertEquals(2, Iterables.size(paths)); - } - } - - @Test - public void test_getPaths_bothEdges() throws Exception { - PcapJob job; - { - final List<Path> inputFiles = new ArrayList<Path>() {{ - add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); - add(new Path("/apps/metron/pcap/pcap_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); - add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); - }}; - job = new PcapJob() { - @Override - protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException { - return inputFiles; - } - }; - Iterable<String> paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis())); - Assert.assertEquals(3, Iterables.size(paths)); - } - } - - @Test public void partition_gives_value_in_range() throws Exception { long start = 1473897600000000000L; long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L); @@ -138,6 +41,9 @@ public class PcapJobTest { conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10)); PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner(); partitioner.setConf(conf); - Assert.assertThat("Partition not in range", partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10), equalTo(8)); + Assert.assertThat("Partition not in range", + partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10), + equalTo(8)); } + } http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index d6d54dc..7d1dba8 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -23,6 +23,17 @@ import com.google.common.base.Predicate; import com.google.common.collect.Collections2; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import javax.annotation.Nullable; import kafka.consumer.ConsumerIterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -55,13 +66,6 @@ import org.json.simple.JSONObject; import org.junit.Assert; import org.junit.Test; -import javax.annotation.Nullable; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FilenameFilter; -import java.io.IOException; -import java.util.*; - public class PcapTopologyIntegrationTest { final static String KAFKA_TOPIC = "pcap"; private static String BASE_DIR = "pcap"; @@ -69,22 +73,7 @@ public class PcapTopologyIntegrationTest { private static String QUERY_DIR = BASE_DIR + "/query"; private String topologiesDir = "src/main/flux"; private String targetDir = "target"; - private File getOutDir(String targetDir) { - File outDir = new File(new File(targetDir), DATA_DIR); - if (!outDir.exists()) { - outDir.mkdirs(); - } - - return outDir; - } - private File getQueryDir(String targetDir) { - File outDir = new File(new File(targetDir), QUERY_DIR); - if (!outDir.exists()) { - outDir.mkdirs(); - } - return outDir; - } private static void clearOutDir(File outDir) { for(File f : outDir.listFiles()) { f.delete(); @@ -100,43 +89,6 @@ public class PcapTopologyIntegrationTest { }).length; } - private static Iterable<Map.Entry<byte[], byte[]>> readPcaps(Path pcapFile, boolean withHeaders) throws IOException { - SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(), - SequenceFile.Reader.file(pcapFile) - ); - List<Map.Entry<byte[], byte[]> > ret = new ArrayList<>(); - IntWritable key = new IntWritable(); - BytesWritable value = new BytesWritable(); - while (reader.next(key, value)) { - byte[] pcapWithHeader = value.copyBytes(); - //if you are debugging and want the hex dump of the packets, uncomment the following: - - //for(byte b : pcapWithHeader) { - // System.out.print(String.format("%02x", b)); - //} - //System.out.println(""); - - long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader); - { - List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader); - for(PacketInfo pi : info) { - Assert.assertEquals(calculatedTs, pi.getPacketTimeInNanos()); - //IF you are debugging and want to see the packets, uncomment the following. - //System.out.println( Long.toUnsignedString(calculatedTs) + " => " + pi.getJsonDoc()); - } - } - if(withHeaders) { - ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapWithHeader)); - } - else { - byte[] pcapRaw = new byte[pcapWithHeader.length - PcapHelper.GLOBAL_HEADER_SIZE - PcapHelper.PACKET_HEADER_SIZE]; - System.arraycopy(pcapWithHeader, PcapHelper.GLOBAL_HEADER_SIZE + PcapHelper.PACKET_HEADER_SIZE, pcapRaw, 0, pcapRaw.length); - ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapRaw)); - } - } - return Iterables.limit(ret, 2*(ret.size()/2)); - } - @Test public void testTimestampInPacket() throws Exception { testTopology(new Function<Properties, Void>() { @@ -561,6 +513,59 @@ public class PcapTopologyIntegrationTest { } } + private File getOutDir(String targetDir) { + File outDir = new File(new File(targetDir), DATA_DIR); + if (!outDir.exists()) { + outDir.mkdirs(); + } + return outDir; + } + + private File getQueryDir(String targetDir) { + File outDir = new File(new File(targetDir), QUERY_DIR); + if (!outDir.exists()) { + outDir.mkdirs(); + } + return outDir; + } + + private static Iterable<Map.Entry<byte[], byte[]>> readPcaps(Path pcapFile, boolean withHeaders) throws IOException { + SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(), + SequenceFile.Reader.file(pcapFile) + ); + List<Map.Entry<byte[], byte[]> > ret = new ArrayList<>(); + IntWritable key = new IntWritable(); + BytesWritable value = new BytesWritable(); + while (reader.next(key, value)) { + byte[] pcapWithHeader = value.copyBytes(); + //if you are debugging and want the hex dump of the packets, uncomment the following: + + //for(byte b : pcapWithHeader) { + // System.out.print(String.format("%02x", b)); + //} + //System.out.println(""); + + long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader); + { + List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader); + for(PacketInfo pi : info) { + Assert.assertEquals(calculatedTs, pi.getPacketTimeInNanos()); + //IF you are debugging and want to see the packets, uncomment the following. + //System.out.println( Long.toUnsignedString(calculatedTs) + " => " + pi.getJsonDoc()); + } + } + if(withHeaders) { + ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapWithHeader)); + } + else { + byte[] pcapRaw = new byte[pcapWithHeader.length - PcapHelper.GLOBAL_HEADER_SIZE - PcapHelper.PACKET_HEADER_SIZE]; + System.arraycopy(pcapWithHeader, PcapHelper.GLOBAL_HEADER_SIZE + PcapHelper.PACKET_HEADER_SIZE, pcapRaw, 0, pcapRaw.length); + ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapRaw)); + } + } + return Iterables.limit(ret, 2*(ret.size()/2)); + } + public static void assertInOrder(Iterable<byte[]> packets) { long previous = 0; for(byte[] packet : packets) { http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFilenameHelper.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFilenameHelper.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFilenameHelper.java new file mode 100644 index 0000000..a6f8546 --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapFilenameHelper.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap; + +import java.util.Arrays; + +/** + * Expects files in the following format + * pcap_$TOPIC_$TS_$PARTITION_$UUID + */ +public class PcapFilenameHelper { + + public static final String PREFIX = "pcap_"; + + /** + * Extract kafka topic from pcap filename. Resilient to underscores and hyphens in the kafka + * topic name when splitting the value. + */ + public static String getKafkaTopic(String pcapFilename) { + String[] tokens = stripPrefix(pcapFilename).split("_"); + return String.join("_", Arrays.copyOfRange(tokens, 0, tokens.length - 3)); + } + + private static String stripPrefix(String s) { + return s.substring(PREFIX.length()); + } + + /** + * Gets unsigned long timestamp from the PCAP filename + * + * @return timestamp, or null if unable to parse + */ + public static Long getTimestamp(String pcapFilename) { + String[] tokens = stripPrefix(pcapFilename).split("_"); + try { + return Long.parseUnsignedLong(tokens[tokens.length - 3]); + } catch (NumberFormatException e) { + return null; + } + } + + /** + * Gets Kafka partition number from the PCAP filename + * + * @return partition, or null if unable to parse + */ + public static Integer getKafkaPartition(String pcapFilename) { + String[] tokens = stripPrefix(pcapFilename).split("_"); + try { + return Integer.parseInt(tokens[tokens.length - 2]); + } catch (NumberFormatException e) { + return null; + } + } + + public static String getUUID(String pcapFilename) { + String[] tokens = stripPrefix(pcapFilename).split("_"); + return tokens[tokens.length - 1]; + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java index e1ad3ca..bb7d9f0 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java @@ -19,8 +19,12 @@ package org.apache.metron.pcap; import com.google.common.base.Joiner; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; import org.apache.metron.spout.pcap.Endianness; @@ -37,13 +41,6 @@ import org.krakenapps.pcap.packet.PcapPacket; import org.krakenapps.pcap.util.Buffer; import org.krakenapps.pcap.util.ByteOrderConverter; -import java.io.EOFException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class PcapHelper { public static final int PACKET_HEADER_SIZE = 4*Integer.BYTES; @@ -72,16 +69,6 @@ public class PcapHelper { } }; - public static Long getTimestamp(String filename) { - try { - return Long.parseUnsignedLong(Iterables.get(Splitter.on('_').split(filename), 2)); - } - catch(Exception e) { - //something went wrong here. - return null; - } - } - /** * * @param topic @@ -363,4 +350,14 @@ public class PcapHelper { } return messages; } + + public static boolean greaterThanOrEqualTo(long a, long b) { + return Long.compareUnsigned(a, b) >= 0; + } + + public static boolean lessThanOrEqualTo(long a, long b) { + return Long.compareUnsigned(a, b) <= 0; + } + } + http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index 8d40e5f..62f9844 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -18,7 +18,20 @@ package org.apache.metron.pcap.mr; +import static org.apache.metron.pcap.PcapHelper.greaterThanOrEqualTo; +import static org.apache.metron.pcap.PcapHelper.lessThanOrEqualTo; + import com.google.common.base.Joiner; +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -33,22 +46,19 @@ import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.log4j.Logger; import org.apache.metron.common.hadoop.SequenceFileIterable; import org.apache.metron.pcap.PacketInfo; import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.filter.PcapFilter; import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.filter.PcapFilters; - -import java.io.IOException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.stream.Stream; +import org.apache.metron.pcap.utils.FileFilterUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PcapJob { - private static final Logger LOG = Logger.getLogger(PcapJob.class); + + private static final Logger LOG = LoggerFactory.getLogger(PcapJob.class); public static final String START_TS_CONF = "start_ts"; public static final String END_TS_CONF = "end_ts"; public static final String WIDTH_CONF = "width"; @@ -110,7 +120,7 @@ public class PcapJob { @Override protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { - if (Long.compareUnsigned(key.get(), start) >= 0 && Long.compareUnsigned(key.get(), end) <= 0) { + if (greaterThanOrEqualTo(key.get(), start) && lessThanOrEqualTo(key.get(), end)) { // It is assumed that the passed BytesWritable value is always a *single* PacketInfo object. Passing more than 1 // object will result in the whole set being passed through if any pass the filter. We cannot serialize PacketInfo // objects back to byte arrays, otherwise we could support more than one packet. @@ -144,56 +154,6 @@ public class PcapJob { } } - protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException { - List<Path> ret = new ArrayList<>(); - RemoteIterator<LocatedFileStatus> filesIt = fs.listFiles(basePath, true); - while(filesIt.hasNext()){ - ret.add(filesIt.next().getPath()); - } - return ret; - } - - public Iterable<String> getPaths(FileSystem fs, Path basePath, long begin, long end) throws IOException { - List<String> ret = new ArrayList<>(); - Iterator<Path> files = listFiles(fs, basePath).iterator(); - /* - The trick here is that we need a trailing left endpoint, because we only capture the start of the - timeseries kept in the file. - */ - boolean isFirst = true; - Path leftEndpoint = files.hasNext()?files.next():null; - if(leftEndpoint == null) { - return ret; - } - { - Long ts = PcapHelper.getTimestamp(leftEndpoint.getName()); - if(ts != null && Long.compareUnsigned(ts, begin) >= 0 && Long.compareUnsigned(ts, end) <= 0) { - ret.add(leftEndpoint.toString()); - isFirst = false; - } - } - while(files.hasNext()) { - Path p = files.next(); - Long ts = PcapHelper.getTimestamp(p.getName()); - if(ts != null && Long.compareUnsigned(ts, begin) >= 0 && Long.compareUnsigned(ts, end) <= 0) { - if(isFirst && leftEndpoint != null) { - ret.add(leftEndpoint.toString()); - } - if(isFirst) { - isFirst = false; - } - ret.add(p.toString()); - } - else { - leftEndpoint = p; - } - } - if(LOG.isDebugEnabled()) { - LOG.debug("Including files " + Joiner.on(",").join(ret)); - } - return ret; - } - /** * Returns a lazily-read Iterable over a set of sequence files */ @@ -207,9 +167,7 @@ public class PcapJob { } files.add(p); } - if (LOG.isDebugEnabled()) { - LOG.debug(outputPath); - } + LOG.debug("Output path={}", outputPath); Collections.sort(files, (o1,o2) -> o1.getName().compareTo(o2.getName())); return new SequenceFileIterable(files, config); } @@ -244,11 +202,14 @@ public class PcapJob { , fs , filterImpl ); + if (job == null) { + LOG.info("No files to process with specified date range."); + return new SequenceFileIterable(new ArrayList<>(), conf); + } boolean completed = job.waitForCompletion(true); if(completed) { return readResults(outputPath, conf, fs); - } - else { + } else { throw new RuntimeException("Unable to complete query due to errors. Please check logs for full errors."); } } @@ -282,11 +243,25 @@ public class PcapJob { job.setPartitionerClass(PcapPartitioner.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(BytesWritable.class); - SequenceFileInputFormat.addInputPaths(job, Joiner.on(',').join(getPaths(fs, basePath, beginNS, endNS ))); + Iterable<String> filteredPaths = FileFilterUtil.getPathsInTimeRange(beginNS, endNS, listFiles(fs, basePath)); + String inputPaths = Joiner.on(',').join(filteredPaths); + if (StringUtils.isEmpty(inputPaths)) { + return null; + } + SequenceFileInputFormat.addInputPaths(job, inputPaths); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setOutputPath(job, outputPath); return job; + } + protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException { + List<Path> ret = new ArrayList<>(); + RemoteIterator<LocatedFileStatus> filesIt = fs.listFiles(basePath, true); + while (filesIt.hasNext()) { + ret.add(filesIt.next().getPath()); + } + return ret; } + } http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/FileFilterUtil.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/FileFilterUtil.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/FileFilterUtil.java new file mode 100644 index 0000000..bbbc4bb --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/FileFilterUtil.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.utils; + +import static org.apache.metron.pcap.PcapHelper.greaterThanOrEqualTo; +import static org.apache.metron.pcap.PcapHelper.lessThanOrEqualTo; + +import com.google.common.base.Joiner; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.fs.Path; +import org.apache.metron.pcap.PcapFilenameHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FileFilterUtil { + + private static final Logger LOG = LoggerFactory.getLogger(FileFilterUtil.class); + + private FileFilterUtil() { + } + + /* + * The trick here is that we need a trailing left endpoint, because we only capture the start of the + * timeseries kept in the file. + */ + public static Iterable<String> getPathsInTimeRange(long beginTs, long endTs, + Iterable<Path> files) { + Map<Integer, List<Path>> filesByPartition = getFilesByPartition(files); + List<String> filteredFiles = filterByTimestampLT(beginTs, endTs, filesByPartition); + if (LOG.isDebugEnabled()) { + LOG.debug("Including files " + Joiner.on(",").join(filteredFiles)); + } + return filteredFiles; + } + + public static Map<Integer, List<Path>> getFilesByPartition(Iterable<Path> files) { + Iterator<Path> filesIt = files.iterator(); + Map<Integer, List<Path>> filesByPartition = new HashMap<>(); + while (filesIt.hasNext()) { + Path p = filesIt.next(); + Integer partition = PcapFilenameHelper.getKafkaPartition(p.getName()); + if (!filesByPartition.containsKey(partition)) { + filesByPartition.put(partition, new ArrayList<>()); + } + filesByPartition.get(partition).add(p); + } + return filesByPartition; + } + + /** + * Given a map of partition numbers to files, return a list of files filtered by the supplied + * beginning and ending timestamps. Includes a left-trailing file. + * + * @param filesByPartition list of files mapped to partitions. Incoming files do not need to be + * sorted as this method will perform a lexicographical sort in normal ascending order. + * @return filtered list of files, unsorted + */ + public static List<String> filterByTimestampLT(long beginTs, long endTs, + Map<Integer, List<Path>> filesByPartition) { + List<String> filteredFiles = new ArrayList<>(); + for (Integer key : filesByPartition.keySet()) { + List<Path> paths = filesByPartition.get(key); + filteredFiles.addAll(filterByTimestampLT(beginTs, endTs, paths)); + } + return filteredFiles; + } + + /** + * Return a list of files filtered by the supplied beginning and ending timestamps. Includes a + * left-trailing file. + * + * @param paths list of files. Incoming files do not need to be sorted as this method will perform + * a lexicographical sort in normal ascending order. + * @return filtered list of files + */ + public static List<String> filterByTimestampLT(long beginTs, long endTs, List<Path> paths) { + List<String> filteredFiles = new ArrayList<>(); + + //noinspection unchecked - hadoop fs uses non-generic Comparable interface + Collections.sort(paths); + Iterator<Path> filesIt = paths.iterator(); + Path leftTrailing = filesIt.hasNext() ? filesIt.next() : null; + if (leftTrailing == null) { + return filteredFiles; + } + boolean first = true; + Long fileTS = PcapFilenameHelper.getTimestamp(leftTrailing.getName()); + if (fileTS != null + && greaterThanOrEqualTo(fileTS, beginTs) && lessThanOrEqualTo(fileTS, endTs)) { + filteredFiles.add(leftTrailing.toString()); + first = false; + } + + if (first && !filesIt.hasNext()) { + filteredFiles.add(leftTrailing.toString()); + return filteredFiles; + } + + while (filesIt.hasNext()) { + Path p = filesIt.next(); + fileTS = PcapFilenameHelper.getTimestamp(p.getName()); + if (fileTS != null + && greaterThanOrEqualTo(fileTS, beginTs) && lessThanOrEqualTo(fileTS, endTs)) { + if (first) { + filteredFiles.add(leftTrailing.toString()); + first = false; + } + filteredFiles.add(p.toString()); + } else { + leftTrailing = p; + } + } + + return filteredFiles; + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapFilenameHelperTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapFilenameHelperTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapFilenameHelperTest.java new file mode 100644 index 0000000..03778d0 --- /dev/null +++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapFilenameHelperTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.junit.Test; + +public class PcapFilenameHelperTest { + + @Test + public void extracts_info_from_filename() { + { + String pcapFilename = "pcap_pcap128_1494962815457986000_18_pcap-63-1495027314"; + assertThat(PcapFilenameHelper.getKafkaTopic(pcapFilename), equalTo("pcap128")); + assertThat( + Long.compareUnsigned(PcapFilenameHelper.getTimestamp(pcapFilename), 1494962815457986000L), + equalTo(0)); + assertThat(PcapFilenameHelper.getKafkaPartition(pcapFilename), equalTo(18)); + assertThat(PcapFilenameHelper.getUUID(pcapFilename), equalTo("pcap-63-1495027314")); + } + { + String pcapFilename = "pcap_pcap-128_1494962815457986000_18_pcap-63-1495027314"; + assertThat(PcapFilenameHelper.getKafkaTopic(pcapFilename), equalTo("pcap-128")); + assertThat( + Long.compareUnsigned(PcapFilenameHelper.getTimestamp(pcapFilename), 1494962815457986000L), + equalTo(0)); + } + { + String pcapFilename = "pcap_pcap_128_1494962815457986000_18_pcap-63-1495027314"; + assertThat(PcapFilenameHelper.getKafkaTopic(pcapFilename), equalTo("pcap_128")); + assertThat( + Long.compareUnsigned(PcapFilenameHelper.getTimestamp(pcapFilename), 1494962815457986000L), + equalTo(0)); + } + { + String pcapFilename = "pcap_pcap___128___1494962815457986000_18_pcap-63-1495027314"; + assertThat(PcapFilenameHelper.getKafkaTopic(pcapFilename), equalTo("pcap___128__")); + assertThat( + Long.compareUnsigned(PcapFilenameHelper.getTimestamp(pcapFilename), 1494962815457986000L), + equalTo(0)); + } + { + String pcapFilename = "pcap___pcap___128___1494962815457986000_18_pcap-63-1495027314"; + assertThat(PcapFilenameHelper.getKafkaTopic(pcapFilename), equalTo("__pcap___128__")); + assertThat( + Long.compareUnsigned(PcapFilenameHelper.getTimestamp(pcapFilename), 1494962815457986000L), + equalTo(0)); + } + } + + @Test + public void extracts_null_info_from_bad_filename_parts() { + String pcapFilename = "pcap_pcap128_AAA4962815457986000_BB_pcap-63-1495027314"; + assertThat(PcapFilenameHelper.getTimestamp(pcapFilename), equalTo(null)); + assertThat(PcapFilenameHelper.getKafkaPartition(pcapFilename), equalTo(null)); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/e2197316/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/FileFilterUtilTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/FileFilterUtilTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/FileFilterUtilTest.java new file mode 100644 index 0000000..cc05a9a --- /dev/null +++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/mr/FileFilterUtilTest.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.mr; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.Iterables; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; +import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.apache.metron.pcap.utils.FileFilterUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class FileFilterUtilTest { + + private List<Path> filesIn; + + @Before + public void setup() { + filesIn = new ArrayList<>(); + filesIn.add(new Path("/apath/pcap_pcap5_1495135372055519000_2_pcap-9-1495134910")); + filesIn.add(new Path("/apath/pcap_pcap5_1495135372168719000_1_pcap-9-1495134910")); + filesIn.add(new Path("/apath/pcap_pcap5_1495135377055375000_0_pcap-9-1495134910")); + filesIn.add(new Path("/apath/pcap_pcap5_1495135512102506000_4_pcap-9-1495134910")); + filesIn.add(new Path("/apath/pcap_pcap5_1495135512123943000_3_pcap-9-1495134910")); + } + + @Test + public void returns_files_by_partition() { + Map<Integer, List<Path>> filesByPartition = FileFilterUtil.getFilesByPartition(filesIn); + Map<Integer, List<Path>> expectedFilesPartitioned = new HashMap() {{ + put(0, toList("/apath/pcap_pcap5_1495135377055375000_0_pcap-9-1495134910")); + put(1, toList("/apath/pcap_pcap5_1495135372168719000_1_pcap-9-1495134910")); + put(2, toList("/apath/pcap_pcap5_1495135372055519000_2_pcap-9-1495134910")); + put(3, toList("/apath/pcap_pcap5_1495135512123943000_3_pcap-9-1495134910")); + put(4, toList("/apath/pcap_pcap5_1495135512102506000_4_pcap-9-1495134910")); + }}; + assertThat(filesByPartition, equalTo(expectedFilesPartitioned)); + } + + private List<Path> toList(String... items) { + return Arrays.asList(items).stream().map(i -> new Path(i)).collect(Collectors.toList()); + } + + @Test + public void returns_left_trailing_filtered_list() { + Map<Integer, List<Path>> filesByPartition = new HashMap() {{ + put(0, toList("/apath/pcap_pcap5_1495135377055375000_0_pcap-9-1495134910")); + put(1, toList("/apath/pcap_pcap5_1495135372168719000_1_pcap-9-1495134910")); + put(2, toList("/apath/pcap_pcap5_1495135372055519000_2_pcap-9-1495134910")); + put(3, toList("/apath/pcap_pcap5_1495135512123943000_3_pcap-9-1495134910")); + put(4, toList("/apath/pcap_pcap5_1495135512102506000_4_pcap-9-1495134910")); + }}; + List<String> lt = FileFilterUtil + .filterByTimestampLT(1495135377055375000L, 1495135512124943000L, filesByPartition); + List<String> expectedFiles = Arrays.asList( + "/apath/pcap_pcap5_1495135377055375000_0_pcap-9-1495134910", + "/apath/pcap_pcap5_1495135372168719000_1_pcap-9-1495134910", + "/apath/pcap_pcap5_1495135372055519000_2_pcap-9-1495134910", + "/apath/pcap_pcap5_1495135512123943000_3_pcap-9-1495134910", + "/apath/pcap_pcap5_1495135512102506000_4_pcap-9-1495134910"); + assertThat(lt, equalTo(expectedFiles)); + } + + @Test + public void returns_left_trailing_filtered_list_from_paths() { + Iterable<String> paths = FileFilterUtil + .getPathsInTimeRange(1495135377055375000L, 1495135512124943000L, filesIn); + List<String> expectedFiles = Arrays.asList( + "/apath/pcap_pcap5_1495135377055375000_0_pcap-9-1495134910", + "/apath/pcap_pcap5_1495135372168719000_1_pcap-9-1495134910", + "/apath/pcap_pcap5_1495135372055519000_2_pcap-9-1495134910", + "/apath/pcap_pcap5_1495135512123943000_3_pcap-9-1495134910", + "/apath/pcap_pcap5_1495135512102506000_4_pcap-9-1495134910"); + assertThat(paths, equalTo(expectedFiles)); + } + + @Test + public void test_getPaths_NoFiles() throws Exception { + final List<Path> inputFiles = new ArrayList<Path>(); + Iterable<String> paths = FileFilterUtil.getPathsInTimeRange(0, 1000, inputFiles); + Assert.assertTrue(Iterables.isEmpty(paths)); + } + + @Test + public void test_getPaths_leftEdge() throws Exception { + final List<Path> inputFiles = new ArrayList<Path>() {{ + add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); + add(new Path("/apps/metron/pcap/pcap_pcap_1561589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); + }}; + Iterable<String> paths = FileFilterUtil.getPathsInTimeRange(0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()), inputFiles); + Assert.assertEquals(1, Iterables.size(paths)); + } + + @Test + public void test_getPaths_rightEdge() throws Exception { + { + final List<Path> inputFiles = new ArrayList<Path>() {{ + add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); + add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); + }}; + Iterable<String> paths = FileFilterUtil.getPathsInTimeRange(1461589333993573000L - 1L, 1461589333993573000L + 1L, inputFiles); + Assert.assertEquals(2, Iterables.size(paths)); + } + { + final List<Path> inputFiles = new ArrayList<Path>() {{ + add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); + add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); + add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); + }}; + Iterable<String> paths = FileFilterUtil.getPathsInTimeRange(1461589334993573000L - 1L, 1461589334993573000L + 1L, inputFiles); + Assert.assertEquals(2, Iterables.size(paths)); + } + } + + @Test + public void test_getPaths_bothEdges() throws Exception { + final List<Path> inputFiles = new ArrayList<Path>() {{ + add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); + add(new Path("/apps/metron/pcap/pcap_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); + add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094")); + }}; + Iterable<String> paths = FileFilterUtil.getPathsInTimeRange(0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()), inputFiles); + Assert.assertEquals(3, Iterables.size(paths)); + } +}