Repository: metron
Updated Branches:
  refs/heads/master 308c2b27b -> 9fdccba37


http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index 269f69b..05c494b 100644
--- 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -22,17 +22,17 @@ import static 
org.apache.metron.pcap.PcapHelper.greaterThanOrEqualTo;
 import static org.apache.metron.pcap.PcapHelper.lessThanOrEqualTo;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.UUID;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
@@ -50,30 +50,43 @@ import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.metron.common.hadoop.SequenceFileIterable;
+import org.apache.metron.job.Finalizer;
+import org.apache.metron.job.JobException;
 import org.apache.metron.job.JobStatus;
 import org.apache.metron.job.JobStatus.State;
 import org.apache.metron.job.Pageable;
 import org.apache.metron.job.Statusable;
 import org.apache.metron.pcap.PacketInfo;
-import org.apache.metron.pcap.PcapFiles;
 import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.PcapPages;
+import org.apache.metron.pcap.config.PcapOptions;
 import org.apache.metron.pcap.filter.PcapFilter;
 import org.apache.metron.pcap.filter.PcapFilterConfigurator;
 import org.apache.metron.pcap.filter.PcapFilters;
 import org.apache.metron.pcap.utils.FileFilterUtil;
-import org.apache.metron.pcap.writer.ResultsWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PcapJob implements Statusable {
+/**
+ * Encompasses MapReduce job and final writing of Pageable results to 
specified location.
+ * Cleans up MapReduce results from HDFS on completion.
+ */
+public class PcapJob<T> implements Statusable<Path> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final String START_TS_CONF = "start_ts";
   public static final String END_TS_CONF = "end_ts";
   public static final String WIDTH_CONF = "width";
-  private Job job; // store a running MR job reference for async status check
-  private Path outputPath;
+  private static final long THREE_SECONDS = 3000;
+  private static final long ONE_SECOND = 1000;
+  private Job mrJob; // store a running MR job reference for async status check
+  private State jobState; // overall job state, including finalization step
+  private Finalizer<Path> finalizer;
+  private Map<String, Object> configuration;
+  private Pageable<Path> finalResults;
+  private Timer timer;
+  private long statusInterval; // how often timer thread checks job status.
+  private long completeCheckInterval; // how long we sleep between isDone 
checks in get()
 
   public static enum PCAP_COUNTER {
     MALFORMED_PACKET_COUNT
@@ -167,61 +180,92 @@ public class PcapJob implements Statusable {
     }
   }
 
+  public PcapJob() {
+    jobState = State.NOT_RUNNING;
+    finalResults = new PcapPages();
+    statusInterval = THREE_SECONDS;
+    completeCheckInterval = ONE_SECOND;
+  }
+
   /**
-   * Run query synchronously.
+   * Primarily for testing.
+   *
+   * @param interval time in millis
    */
-  public <T> SequenceFileIterable query(Path basePath
-                            , Path baseOutputPath
-                            , long beginNS
-                            , long endNS
-                            , int numReducers
-                            , T fields
-                            , Configuration conf
-                            , FileSystem fs
-                            , PcapFilterConfigurator<T> filterImpl
-                            ) throws IOException, ClassNotFoundException, 
InterruptedException {
-    Statusable statusable = query(Optional.empty(), basePath, baseOutputPath, 
beginNS, endNS, numReducers, fields,
-        conf,
-        fs, filterImpl, true);
-    JobStatus jobStatus = statusable.getStatus();
-    if (jobStatus.getState() == State.SUCCEEDED) {
-      Path resultPath = jobStatus.getResultPath();
-      return readResults(resultPath, conf, fs);
-    } else {
-      throw new RuntimeException(
-          "Unable to complete query due to errors.  Please check logs for full 
errors.");
+  public void setStatusInterval(long interval) {
+    statusInterval = interval;
+  }
+
+  /**
+   * Primarily for testing.
+   *
+   * @param interval time in millis
+   */
+  public void setCompleteCheckInterval(long interval) {
+    completeCheckInterval = interval;
+  }
+
+  @Override
+  public Statusable<Path> submit(Finalizer<Path> finalizer, Map<String, 
Object> configuration)
+      throws JobException {
+    this.finalizer = finalizer;
+    this.configuration = configuration;
+    Optional<String> jobName = 
Optional.ofNullable(PcapOptions.JOB_NAME.get(configuration, String.class));
+    Configuration hadoopConf = PcapOptions.HADOOP_CONF.get(configuration, 
Configuration.class);
+    FileSystem fileSystem = PcapOptions.FILESYSTEM.get(configuration, 
FileSystem.class);
+    Path basePath = PcapOptions.BASE_PATH.getTransformed(configuration, 
Path.class);
+    Path baseInterimResultPath = 
PcapOptions.BASE_INTERIM_RESULT_PATH.getTransformed(configuration, Path.class);
+    long startTime = PcapOptions.START_TIME_NS.get(configuration, Long.class);
+    long endTime = PcapOptions.END_TIME_NS.get(configuration, Long.class);
+    int numReducers = PcapOptions.NUM_REDUCERS.get(configuration, 
Integer.class);
+    T fields = (T) PcapOptions.FIELDS.get(configuration, Object.class);
+    PcapFilterConfigurator<T> filterImpl = 
PcapOptions.FILTER_IMPL.get(configuration, PcapFilterConfigurator.class);
+
+    try {
+      return query(jobName,
+          basePath,
+          baseInterimResultPath,
+          startTime,
+          endTime,
+          numReducers,
+          fields,
+          // create a new copy for each job, bad things happen when hadoop 
config is reused
+          new Configuration(hadoopConf),
+          fileSystem,
+          filterImpl);
+    } catch (IOException | InterruptedException | ClassNotFoundException e) {
+      throw new JobException("Failed to run pcap query.", e);
     }
   }
 
   /**
-   * Run query sync OR async based on flag. Async mode allows the client to 
check the returned
-   * statusable object for status details.
+   * Run query asynchronously.
    */
-  public <T> Statusable query(Optional<String> jobName,
+  public Statusable<Path> query(Optional<String> jobName,
       Path basePath,
-      Path baseOutputPath,
+      Path baseInterimResultPath,
       long beginNS,
       long endNS,
       int numReducers,
       T fields,
       Configuration conf,
       FileSystem fs,
-      PcapFilterConfigurator<T> filterImpl,
-      boolean sync)
+      PcapFilterConfigurator<T> filterImpl)
       throws IOException, ClassNotFoundException, InterruptedException {
     String outputDirName = Joiner.on("_").join(beginNS, endNS, 
filterImpl.queryToString(fields), UUID.randomUUID().toString());
     if(LOG.isDebugEnabled()) {
-      DateFormat format = SimpleDateFormat.getDateTimeInstance( 
SimpleDateFormat.LONG
+      DateFormat format = 
SimpleDateFormat.getDateTimeInstance(SimpleDateFormat.LONG
           , SimpleDateFormat.LONG
       );
       String from = format.format(new Date(Long.divideUnsigned(beginNS, 
1000000)));
       String to = format.format(new Date(Long.divideUnsigned(endNS, 1000000)));
       LOG.debug("Executing query {} on timerange from {} to {}", 
filterImpl.queryToString(fields), from, to);
     }
-    outputPath =  new Path(baseOutputPath, outputDirName);
-    job = createJob(jobName
+    Path interimResultPath =  new Path(baseInterimResultPath, outputDirName);
+    PcapOptions.INTERIM_RESULT_PATH.put(configuration, interimResultPath);
+    mrJob = createJob(jobName
         , basePath
-        , outputPath
+        , interimResultPath
         , beginNS
         , endNS
         , numReducers
@@ -230,69 +274,77 @@ public class PcapJob implements Statusable {
         , fs
         , filterImpl
     );
-    if (sync) {
-      job.waitForCompletion(true);
-    } else {
-      job.submit();
-    }
+    mrJob.submit();
+    jobState = State.RUNNING;
+    startJobStatusTimerThread(statusInterval);
     return this;
   }
 
-  /**
-   * Returns a lazily-read Iterable over a set of sequence files
-   */
-  private SequenceFileIterable readResults(Path outputPath, Configuration 
config, FileSystem fs) throws IOException {
-    List<Path> files = new ArrayList<>();
-    for (RemoteIterator<LocatedFileStatus> it = fs.listFiles(outputPath, 
false); it.hasNext(); ) {
-      Path p = it.next().getPath();
-      if (p.getName().equals("_SUCCESS")) {
-        fs.delete(p, false);
-        continue;
+  private void startJobStatusTimerThread(long interval) {
+    timer = new Timer();
+    timer.scheduleAtFixedRate(new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          synchronized (this) {
+            if (jobState == State.RUNNING) {
+              if (mrJob.isComplete()) {
+                switch (mrJob.getStatus().getState()) {
+                  case SUCCEEDED:
+                    jobState = State.FINALIZING;
+                    if (setFinalResults(finalizer, configuration)) {
+                      jobState = State.SUCCEEDED;
+                    } else {
+                      jobState = State.FAILED;
+                    }
+                    break;
+                  case FAILED:
+                    jobState = State.FAILED;
+                    break;
+                  case KILLED:
+                    jobState = State.KILLED;
+                    break;
+                }
+                cancel(); // be gone, ye!
+              }
+            }
+          }
+        } catch (InterruptedException | IOException e) {
+          jobState = State.FAILED;
+          cancel();
+        }
       }
-      files.add(p);
-    }
-    if (files.size() == 0) {
-      LOG.info("No files to process with specified date range.");
-    } else {
-      LOG.debug("Output path={}", outputPath);
-      Collections.sort(files, (o1, o2) -> 
o1.getName().compareTo(o2.getName()));
-    }
-    return new SequenceFileIterable(files, config);
+    }, interval, interval);
   }
 
-  public Pageable<Path> writeResults(SequenceFileIterable results, 
ResultsWriter resultsWriter,
-      Path outPath, int recPerFile, String prefix) throws IOException {
-    List<Path> outFiles = new ArrayList<>();
+  /**
+   * Writes results using finalizer. Returns true on success, false otherwise.
+   *
+   * @param finalizer Writes results.
+   * @param configuration Configure the finalizer.
+   * @return Returns true on success, false otherwise.
+   */
+  private boolean setFinalResults(Finalizer<Path> finalizer, Map<String, 
Object> configuration) {
+    boolean success = true;
+    Pageable<Path> results = new PcapPages();
     try {
-      Iterable<List<byte[]>> partitions = Iterables.partition(results, 
recPerFile);
-      int part = 1;
-      if (partitions.iterator().hasNext()) {
-        for (List<byte[]> data : partitions) {
-          String outFileName = String.format("%s/pcap-data-%s+%04d.pcap", 
outPath, prefix, part++);
-          if (data.size() > 0) {
-            resultsWriter.write(new Configuration(), data, outFileName);
-            outFiles.add(new Path(outFileName));
-          }
-        }
-      } else {
-        LOG.info("No results returned.");
-      }
-    } finally {
-      try {
-        results.cleanup();
-      } catch (IOException e) {
-        LOG.warn("Unable to cleanup files in HDFS", e);
-      }
+      results = finalizer.finalizeJob(configuration);
+    } catch (JobException e) {
+      LOG.error("Failed to finalize job.", e);
+      success = false;
+    }
+    synchronized (this) {
+      finalResults = results;
     }
-    return new PcapFiles(outFiles);
+    return success;
   }
 
   /**
-   * Creates, but does not submit the job.
+   * Creates, but does not submit the job. This is the core MapReduce mrJob.
    */
-  public <T> Job createJob(Optional<String> jobName
+  public Job createJob(Optional<String> jobName
                       ,Path basePath
-                      , Path outputPath
+                      , Path jobOutputPath
                       , long beginNS
                       , long endNS
                       , int numReducers
@@ -325,7 +377,7 @@ public class PcapJob implements Statusable {
     SequenceFileInputFormat.addInputPaths(job, inputPaths);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
-    SequenceFileOutputFormat.setOutputPath(job, outputPath);
+    SequenceFileOutputFormat.setOutputPath(job, jobOutputPath);
     return job;
   }
 
@@ -343,55 +395,94 @@ public class PcapJob implements Statusable {
   }
 
   @Override
-  public JobStatus getStatus() {
-    // Note: this method is only reading state from the underlying job, so 
locking not needed
-    JobStatus status = new JobStatus().withResultPath(outputPath);
-    if (job == null) {
+  public JobType getJobType() {
+    return JobType.MAP_REDUCE;
+  }
+
+  /**
+   * Synchronized for mrJob and jobState
+   */
+  @Override
+  public synchronized JobStatus getStatus() throws JobException {
+    JobStatus status = new JobStatus();
+    if (mrJob == null) {
       status.withPercentComplete(100).withState(State.SUCCEEDED);
     } else {
       try {
-        status.withJobId(job.getStatus().getJobID().toString());
-        if (job.isComplete()) {
-          status.withPercentComplete(100);
-          switch (job.getStatus().getState()) {
-            case SUCCEEDED:
-              
status.withState(State.SUCCEEDED).withDescription(State.SUCCEEDED.toString());
-              break;
-            case FAILED:
-              status.withState(State.FAILED);
-              break;
-            case KILLED:
-              status.withState(State.KILLED);
-              break;
-          }
+        org.apache.hadoop.mapreduce.JobStatus mrJobStatus = mrJob.getStatus();
+        status.withJobId(mrJobStatus.getJobID().toString());
+        if (jobState == State.SUCCEEDED) {
+          status.withPercentComplete(100).withState(State.SUCCEEDED)
+              .withDescription("Job complete");
         } else {
-          float mapProg = job.mapProgress();
-          float reduceProg = job.reduceProgress();
-          float totalProgress = ((mapProg / 2) + (reduceProg / 2)) * 100;
-          String description = String.format("map: %s%%, reduce: %s%%", 
mapProg * 100, reduceProg * 100);
-          status.withPercentComplete(totalProgress).withState(State.RUNNING)
-              .withDescription(description);
+          if (mrJob.isComplete()) {
+            status.withPercentComplete(100);
+            switch (mrJobStatus.getState()) {
+              case SUCCEEDED:
+                
status.withState(State.FINALIZING).withDescription(State.FINALIZING.toString());
+                break;
+              case FAILED:
+                
status.withState(State.FAILED).withDescription(State.FAILED.toString());
+                break;
+              case KILLED:
+                
status.withState(State.KILLED).withDescription(State.KILLED.toString());
+                break;
+              default:
+                throw new IllegalStateException(
+                    "Unknown job state reported as 'complete' by mapreduce 
framework: "
+                        + mrJobStatus.getState());
+            }
+          } else {
+            float mapProg = mrJob.mapProgress();
+            float reduceProg = mrJob.reduceProgress();
+            float totalProgress = ((mapProg / 2) + (reduceProg / 2)) * 100;
+            String description = String
+                .format("map: %s%%, reduce: %s%%", mapProg * 100, reduceProg * 
100);
+            status.withPercentComplete(totalProgress).withState(State.RUNNING)
+                .withDescription(description);
+          }
         }
       } catch (Exception e) {
-        throw new RuntimeException("Error occurred while attempting to 
retrieve job status.", e);
+        throw new JobException("Error occurred while attempting to retrieve 
job status.", e);
       }
     }
     return status;
   }
 
+  /**
+   * Synchronous call blocks until completion.
+   */
   @Override
-  public boolean isDone() {
-    // Note: this method is only reading state from the underlying job, so 
locking not needed
-    try {
-      return job.isComplete();
-    } catch (Exception e) {
-      throw new RuntimeException("Error occurred while attempting to retrieve 
job status.", e);
+  public Pageable<Path> get() throws JobException, InterruptedException {
+    for (; ; ) {
+      JobStatus status = getStatus();
+      if (status.getState() == State.SUCCEEDED
+          || status.getState() == State.KILLED
+          || status.getState() == State.FAILED) {
+        return getFinalResults();
+      }
+      Thread.sleep(completeCheckInterval);
     }
   }
 
+  private synchronized Pageable<Path> getFinalResults() {
+    return new PcapPages(finalResults);
+  }
+
+  @Override
+  public synchronized boolean isDone() {
+    return (jobState == State.SUCCEEDED
+        || jobState == State.KILLED
+        || jobState == State.FAILED);
+  }
+
   @Override
-  public void kill() throws IOException {
-    job.killJob();
+  public void kill() throws JobException {
+    try {
+      mrJob.killJob();
+    } catch (IOException e) {
+      throw new JobException("Unable to kill pcap job.", e);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapResultsWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapResultsWriter.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapResultsWriter.java
new file mode 100644
index 0000000..62ac27c
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapResultsWriter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.writer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.utils.HDFSUtils;
+import org.apache.metron.pcap.PcapMerger;
+
+public class PcapResultsWriter {
+
+  /**
+   * Write out pcaps. Configuration offers ability to configure for HDFS or 
local FS, if desired.
+   *
+   * @param config Standard hadoop filesystem config.
+   * @param pcaps pcap data to write. Pre-merged format as a list of pcaps as 
byte arrays.
+   * @param outPath where to write the pcap data to.
+   * @throws IOException I/O issue encountered.
+   */
+  public void write(Configuration config, List<byte[]> pcaps, String outPath) 
throws IOException {
+    HDFSUtils.write(config, mergePcaps(pcaps), outPath);
+  }
+
+  /**
+   * Creates a pcap file with proper global header from individual pcaps.
+   *
+   * @param pcaps pcap records to merge into a pcap file with header.
+   * @return merged result.
+   * @throws IOException I/O issue encountered.
+   */
+  public byte[] mergePcaps(List<byte[]> pcaps) throws IOException {
+    if (pcaps == null) {
+      return new byte[]{};
+    }
+    if (pcaps.size() == 1) {
+      return pcaps.get(0);
+    }
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PcapMerger.merge(baos, pcaps);
+    return baos.toByteArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/ResultsWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/ResultsWriter.java
 
b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/ResultsWriter.java
deleted file mode 100644
index 3934aca..0000000
--- 
a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/ResultsWriter.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcap.writer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.metron.common.utils.HDFSUtils;
-import org.apache.metron.pcap.PcapMerger;
-
-public class ResultsWriter {
-
-  /**
-   * Write out pcaps. Configuration offers ability to configure for HDFS or 
local FS, if desired.
-   *
-   * @param config Standard hadoop filesystem config.
-   * @param pcaps pcap data to write. Pre-merged format as a list of pcaps as 
byte arrays.
-   * @param outPath where to write the pcap data to.
-   * @throws IOException I/O issue encountered.
-   */
-  public void write(Configuration config, List<byte[]> pcaps, String outPath) 
throws IOException {
-    HDFSUtils.write(config, mergePcaps(pcaps), outPath);
-  }
-
-  /**
-   * Creates a pcap file with proper global header from individual pcaps.
-   *
-   * @param pcaps pcap records to merge into a pcap file with header.
-   * @return merged result.
-   * @throws IOException I/O issue encountered.
-   */
-  public byte[] mergePcaps(List<byte[]> pcaps) throws IOException {
-    if (pcaps == null) {
-      return new byte[]{};
-    }
-    if (pcaps.size() == 1) {
-      return pcaps.get(0);
-    }
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    PcapMerger.merge(baos, pcaps);
-    return baos.toByteArray();
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapPagesTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapPagesTest.java
 
b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapPagesTest.java
new file mode 100644
index 0000000..0be2bb5
--- /dev/null
+++ 
b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapPagesTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+public class PcapPagesTest {
+
+  @Test
+  public void iterates_paths() {
+    Path path1 = new Path("/1.txt");
+    Path path2 = new Path("/2.txt");
+    Path path3 = new Path("/3.txt");
+    List<Path> paths = new ArrayList<>();
+    paths.add(path1);
+    paths.add(path2);
+    paths.add(path3);
+    PcapPages pages = new PcapPages(paths);
+    assertThat("Wrong num pages.", pages.getSize(), equalTo(3));
+
+    for (int i = 0; i < pages.getSize(); i++) {
+      assertThat("Page should be equal", pages.getPage(i).toString(),
+          equalTo(paths.get(i).toString()));
+    }
+
+  }
+
+  @Test
+  public void clones_with_copy_constructor() {
+    Path path1 = new Path("/1.txt");
+    Path path2 = new Path("/2.txt");
+    Path path3 = new Path("/3.txt");
+    List<Path> paths = new ArrayList<>();
+    paths.add(path1);
+    paths.add(path2);
+    paths.add(path3);
+
+    PcapPages pages = new PcapPages(paths);
+    PcapPages clonedPages = new PcapPages(pages);
+    assertThat(clonedPages, notNullValue());
+    assertThat(clonedPages.getSize(), equalTo(3));
+    assertThat(clonedPages, not(sameInstance(pages)));
+
+    for (int i = 0; i < pages.getSize(); i++) {
+      assertThat("Page should be different instance.", pages.getPage(i),
+          not(sameInstance(clonedPages.getPage(i))));
+      assertThat("Page should be same path.", pages.getPage(i), 
equalTo(clonedPages.getPage(i)));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml
index c373ded..27a9941 100644
--- a/metron-platform/pom.xml
+++ b/metron-platform/pom.xml
@@ -52,7 +52,6 @@
                <module>metron-pcap</module>
                <module>metron-integration-test</module>
                <module>metron-test-utilities</module>
-               <module>metron-api</module>
                <module>metron-indexing</module>
                <module>metron-management</module>
                <module>metron-writer</module>

Reply via email to