http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java index 059bba2..95907df 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java @@ -18,27 +18,27 @@ package org.apache.metron.pcap.finalizer; -import java.util.Map; import org.apache.hadoop.fs.Path; +import org.apache.metron.job.Statusable; import org.apache.metron.pcap.config.PcapOptions; +import java.util.Map; + /** * Write to HDFS. */ public class PcapRestFinalizer extends PcapFinalizer { - /** - * Format will have the format <output-path>/page-<page-num>.pcap - * The filename prefix is pluggable, but in most cases it will be provided via the PcapConfig - * as a formatted timestamp + uuid. A final sample format will look as follows: - * /base/output/path/pcap-data-201807181911-09855b4ae3204dee8b63760d65198da3+0001.pcap - */ - private static final String PCAP_CLI_FILENAME_FORMAT = "%s/page-%s.pcap"; + private static final String PCAP_REST_FILEPATH_FORMAT = "%s/%s/%s/%s/page-%s.pcap"; + + private String jobType = Statusable.JobType.MAP_REDUCE.name(); @Override - protected String getOutputFileName(Map<String, Object> config, int partition) { - Path finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.getTransformed(config, Path.class); - return String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, partition); + protected Path getOutputPath(Map<String, Object> config, int partition) { + String finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, String.class); + String user = PcapOptions.USERNAME.get(config, String.class); + String jobId = PcapOptions.JOB_ID.get(config, String.class); + return new Path(String.format(PCAP_REST_FILEPATH_FORMAT, finalOutputPath, user, jobType, jobId, partition)); } }
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index 05c494b..1dd670d 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -215,14 +215,24 @@ public class PcapJob<T> implements Statusable<Path> { FileSystem fileSystem = PcapOptions.FILESYSTEM.get(configuration, FileSystem.class); Path basePath = PcapOptions.BASE_PATH.getTransformed(configuration, Path.class); Path baseInterimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH.getTransformed(configuration, Path.class); - long startTime = PcapOptions.START_TIME_NS.get(configuration, Long.class); - long endTime = PcapOptions.END_TIME_NS.get(configuration, Long.class); + long startTime; + if (configuration.containsKey(PcapOptions.START_TIME_NS.getKey())) { + startTime = PcapOptions.START_TIME_NS.get(configuration, Long.class); + } else { + startTime = PcapOptions.START_TIME_MS.get(configuration, Long.class) * 1000000; + } + long endTime; + if (configuration.containsKey(PcapOptions.END_TIME_NS.getKey())) { + endTime = PcapOptions.END_TIME_NS.get(configuration, Long.class); + } else { + endTime = PcapOptions.END_TIME_MS.get(configuration, Long.class) * 1000000; + } int numReducers = PcapOptions.NUM_REDUCERS.get(configuration, Integer.class); T fields = (T) PcapOptions.FIELDS.get(configuration, Object.class); PcapFilterConfigurator<T> filterImpl = PcapOptions.FILTER_IMPL.get(configuration, PcapFilterConfigurator.class); try { - return query(jobName, + Statusable<Path> statusable = query(jobName, basePath, baseInterimResultPath, startTime, @@ -233,6 +243,8 @@ public class PcapJob<T> implements Statusable<Path> { new Configuration(hadoopConf), fileSystem, filterImpl); + PcapOptions.JOB_ID.put(configuration, statusable.getStatus().getJobId()); + return statusable; } catch (IOException | InterruptedException | ClassNotFoundException e) { throw new JobException("Failed to run pcap query.", e); }