METRON-1725 Add ability to specify YARN queue for pcap jobs (merrimanr) closes apache/metron#1153
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7a8c2467 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7a8c2467 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7a8c2467 Branch: refs/heads/master Commit: 7a8c246748a2c9e8c5b9230800b075dd99a7f3a4 Parents: 73dc63e Author: merrimanr <merrim...@gmail.com> Authored: Fri Aug 10 16:46:31 2018 -0500 Committer: rmerriman <merrim...@gmail.com> Committed: Fri Aug 10 16:46:31 2018 -0500 ---------------------------------------------------------------------- .../CURRENT/configuration/metron-rest-env.xml | 9 ++++++ .../package/scripts/params/params_linux.py | 1 + .../METRON/CURRENT/package/templates/metron.j2 | 1 + .../METRON/CURRENT/themes/metron_theme.json | 10 ++++++ metron-interface/metron-rest/README.md | 2 ++ .../src/main/config/rest_application.yml | 1 + .../apache/metron/rest/MetronRestConstants.java | 1 + .../metron/rest/config/PcapJobSupplier.java | 2 +- .../rest/service/impl/PcapServiceImpl.java | 12 ++++++- .../apache/metron/rest/mock/MockPcapJob.java | 8 +++++ .../rest/service/impl/PcapServiceImplTest.java | 7 ++++- metron-platform/metron-pcap-backend/README.md | 2 ++ .../org/apache/metron/pcap/query/CliParser.java | 4 +++ .../org/apache/metron/pcap/query/PcapCli.java | 3 ++ .../apache/metron/pcap/query/PcapCliTest.java | 33 ++++++++++++++++++-- .../apache/metron/pcap/config/PcapConfig.java | 10 ++++++ 16 files changed, 101 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml index 20f9767..895c091 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml @@ -192,5 +192,14 @@ <description>The number of pcaps written to a page/file as a result of a pcap query.</description> <value>10</value> </property> + <property> + <name>pcap_yarn_queue</name> + <display-name>Pcap YARN Queue</display-name> + <description>The YARN queue pcap jobs will be submitted to.</description> + <value/> + <value-attributes> + <empty-value-valid>true</empty-value-valid> + </value-attributes> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py index 73d3469..4f8a9a7 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py @@ -390,6 +390,7 @@ pcap_base_path = config['configurations']['metron-rest-env']['pcap_base_path'] pcap_base_interim_result_path = config['configurations']['metron-rest-env']['pcap_base_interim_result_path'] pcap_final_output_path = config['configurations']['metron-rest-env']['pcap_final_output_path'] pcap_page_size = config['configurations']['metron-rest-env']['pcap_page_size'] +pcap_yarn_queue = config['configurations']['metron-rest-env']['pcap_yarn_queue'] pcap_configured_flag_file = status_params.pcap_configured_flag_file # MapReduce http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 index 278d6f8..55422d0 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 @@ -44,3 +44,4 @@ PCAP_BASE_PATH="{{pcap_base_path}}" PCAP_BASE_INTERIM_RESULT_PATH="{{pcap_base_interim_result_path}}" PCAP_FINAL_OUTPUT_PATH="{{pcap_final_output_path}}" PCAP_PAGE_SIZE="{{pcap_page_size}}" +PCAP_YARN_QUEUE="{{pcap_yarn_queue}}" http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json index 9f5b04e..db06b61 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json @@ -824,6 +824,10 @@ "subsection-name": "subsection-rest" }, { + "config": "metron-rest-env/pcap_yarn_queue", + "subsection-name": "subsection-rest" + }, + { "config": "metron-management-ui-env/metron_management_ui_port", "subsection-name": "subsection-management-ui" }, @@ -1431,6 +1435,12 @@ } }, { + "config": "metron-rest-env/pcap_yarn_queue", + "widget": { + "type": "text-field" + } + }, + { "config": "metron-management-ui-env/metron_management_ui_port", "widget": { "type": "text-field" http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-interface/metron-rest/README.md ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md index 489cd9f..d19d8c3 100644 --- a/metron-interface/metron-rest/README.md +++ b/metron-interface/metron-rest/README.md @@ -221,6 +221,8 @@ The REST application uses a Java Process object to call out to the `pcap_to_pdml Out of the box it is a simple wrapper around the tshark command to transform raw pcap data to PDML. However it can be extended to do additional processing as long as the expected input/output is maintained. REST will supply the script with raw pcap data through standard in and expects PDML data serialized as XML. +Pcap query jobs can be configured for submission to a YARN queue. This setting is exposed as the Spring property `pcap.yarn.queue`. If configured, the REST application will set the `mapreduce.job.queuename` Hadoop property to that value. + ## API Request and Response objects are JSON formatted. The JSON schemas are available in the Swagger UI. http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-interface/metron-rest/src/main/config/rest_application.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/config/rest_application.yml b/metron-interface/metron-rest/src/main/config/rest_application.yml index 7486112..e25ad82 100644 --- a/metron-interface/metron-rest/src/main/config/rest_application.yml +++ b/metron-interface/metron-rest/src/main/config/rest_application.yml @@ -60,3 +60,4 @@ pcap: base.interim.result.path: ${PCAP_BASE_INTERIM_RESULT_PATH} final.output.path: ${PCAP_FINAL_OUTPUT_PATH} page.size: ${PCAP_PAGE_SIZE} + yarn.queue: ${PCAP_YARN_QUEUE} http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java index d38aedb..02655298 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java @@ -81,4 +81,5 @@ public class MetronRestConstants { public static final String PCAP_FINAL_OUTPUT_PATH_SPRING_PROPERTY = "pcap.final.output.path"; public static final String PCAP_PAGE_SIZE_SPRING_PROPERTY = "pcap.page.size"; public static final String PCAP_PDML_SCRIPT_PATH_SPRING_PROPERTY = "pcap.pdml.script.path"; + public static final String PCAP_YARN_QUEUE_SPRING_PROPERTY = "pcap.yarn.queue"; } http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java index 1e79f6a..538e41a 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java @@ -39,7 +39,7 @@ public class PcapJobSupplier implements Supplier<Statusable<Path>> { PcapJob<Path> pcapJob = createPcapJob(); return pcapJob.submit(PcapFinalizerStrategies.REST, pcapRequest); } catch (JobException e) { - throw new RuntimeJobException(e.getMessage()); + throw new RuntimeJobException(e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java index ae3f807..db2e17b 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.dataformat.xml.XmlMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.metron.job.JobException; import org.apache.metron.job.JobNotFoundException; import org.apache.metron.job.JobStatus; @@ -57,6 +58,8 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Map; +import static org.apache.metron.rest.MetronRestConstants.PCAP_YARN_QUEUE_SPRING_PROPERTY; + @Service public class PcapServiceImpl implements PcapService { @@ -250,7 +253,14 @@ public class PcapServiceImpl implements PcapService { protected void setPcapOptions(String username, PcapRequest pcapRequest) throws IOException { PcapOptions.JOB_NAME.put(pcapRequest, "jobName"); PcapOptions.USERNAME.put(pcapRequest, username); - PcapOptions.HADOOP_CONF.put(pcapRequest, configuration); + Configuration hadoopConf = new Configuration(configuration); + if (environment.containsProperty(PCAP_YARN_QUEUE_SPRING_PROPERTY)) { + String queue = environment.getProperty(PCAP_YARN_QUEUE_SPRING_PROPERTY); + if (queue != null && !queue.isEmpty()) { + hadoopConf.set(MRJobConfig.QUEUE_NAME, environment.getProperty(PCAP_YARN_QUEUE_SPRING_PROPERTY)); + } + } + PcapOptions.HADOOP_CONF.put(pcapRequest, hadoopConf); PcapOptions.FILESYSTEM.put(pcapRequest, getFileSystem()); if (pcapRequest.getBasePath() == null) { http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java index 779589d..c977faa 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java @@ -17,7 +17,9 @@ */ package org.apache.metron.rest.mock; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.metron.job.Finalizer; import org.apache.metron.job.JobException; import org.apache.metron.job.JobStatus; @@ -45,6 +47,7 @@ public class MockPcapJob extends PcapJob<Path> { private PcapFilterConfigurator filterImpl; private int recPerFile; private String query; + private String yarnQueue; private Statusable<Path> statusable; public MockPcapJob() { @@ -68,6 +71,7 @@ public class MockPcapJob extends PcapJob<Path> { } this.filterImpl = PcapOptions.FILTER_IMPL.get(configuration, PcapFilterConfigurator.class); this.recPerFile = PcapOptions.NUM_RECORDS_PER_FILE.get(configuration, Integer.class); + this.yarnQueue = PcapOptions.HADOOP_CONF.get(configuration, Configuration.class).get(MRJobConfig.QUEUE_NAME); return statusable; } @@ -144,4 +148,8 @@ public class MockPcapJob extends PcapJob<Path> { public int getRecPerFile() { return recPerFile; } + + public String getYarnQueue() { + return yarnQueue; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java index d539c71..6635598 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.metron.common.Constants; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.job.JobException; @@ -186,7 +187,7 @@ public class PcapServiceImplTest { @Before public void setUp() throws Exception { environment = mock(Environment.class); - configuration = mock(Configuration.class); + configuration = new Configuration(); mockPcapJobSupplier = new MockPcapJobSupplier(); pcapToPdmlScriptWrapper = new PcapToPdmlScriptWrapper(); @@ -200,6 +201,9 @@ public class PcapServiceImplTest { @Test public void submitShouldProperlySubmitFixedPcapRequest() throws Exception { + when(environment.containsProperty(MetronRestConstants.PCAP_YARN_QUEUE_SPRING_PROPERTY)).thenReturn(true); + when(environment.getProperty(MetronRestConstants.PCAP_YARN_QUEUE_SPRING_PROPERTY)).thenReturn("pcap"); + FixedPcapRequest fixedPcapRequest = new FixedPcapRequest(); fixedPcapRequest.setBasePath("basePath"); fixedPcapRequest.setBaseInterimResultPath("baseOutputPath"); @@ -250,6 +254,7 @@ public class PcapServiceImplTest { Assert.assertEquals(2000000, mockPcapJob.getEndTimeNs()); Assert.assertEquals(2, mockPcapJob.getNumReducers()); Assert.assertEquals(100, mockPcapJob.getRecPerFile()); + Assert.assertEquals("pcap", mockPcapJob.getYarnQueue()); Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator); Map<String, String> actualFixedFields = mockPcapJob.getFixedFields(); Assert.assertEquals("ip_src_addr", actualFixedFields.get(Constants.Fields.SRC_ADDR.getName())); http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-platform/metron-pcap-backend/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/README.md b/metron-platform/metron-pcap-backend/README.md index 2ff20d8..031328d 100644 --- a/metron-platform/metron-pcap-backend/README.md +++ b/metron-platform/metron-pcap-backend/README.md @@ -139,6 +139,7 @@ usage: Fixed filter options -sa,--ip_src_addr <arg> Source IP address -sp,--ip_src_port <arg> Source port -st,--start_time <arg> (required) Packet start time range. + -yq,--yarn_queue <arg> Yarn queue this job will be submitted to ``` ``` @@ -158,6 +159,7 @@ usage: Query filter options -ps,--print_status Print the status of the job as it runs -q,--query <arg> Query string to use as a filter -st,--start_time <arg> (required) Packet start time range. + -yq,--yarn_queue <arg> Yarn queue this job will be submitted to ``` The Query filter's `--query` argument specifies the Stellar expression to http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java index 69c725c..5040f90 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java @@ -56,6 +56,7 @@ public class CliParser { options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time.")); options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch.")); options.addOption(newOption("ps", "print_status", false, "Print the status of the job as it runs")); + options.addOption(newOption("yq", "yarn_queue", true, "Yarn queue this job will be submitted to")); return options; } @@ -129,6 +130,9 @@ public class CliParser { if (commandLine.hasOption("print_status")) { config.setPrintJobStatus(true); } + if (commandLine.hasOption("yarn_queue")) { + config.setYarnQueue(commandLine.getOptionValue("yarn_queue")); + } } public void printHelp(String msg, Options opts) { http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java index c23f037..eebf366 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java @@ -26,6 +26,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.job.JobException; @@ -99,6 +100,7 @@ public class PcapCli { return 0; } PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator()); + config.getYarnQueue().ifPresent(s -> hadoopConf.set(MRJobConfig.QUEUE_NAME, s)); PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf); try { PcapOptions.FILESYSTEM.put(commonConfig, FileSystem.get(hadoopConf)); @@ -124,6 +126,7 @@ public class PcapCli { return 0; } PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator()); + config.getYarnQueue().ifPresent(s -> hadoopConf.set(MRJobConfig.QUEUE_NAME, s)); PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf); try { PcapOptions.FILESYSTEM.put(commonConfig, FileSystem.get(hadoopConf)); http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java index 7c75224..a71e997 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java @@ -33,10 +33,18 @@ import java.io.PrintStream; import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.metron.common.Constants; import org.apache.metron.common.system.Clock; import org.apache.metron.common.utils.timestamp.TimestampConverters; @@ -114,7 +122,24 @@ public class PcapCliTest { return new TypeSafeMatcher<Map<K, V>>() { @Override protected boolean matchesSafely(Map<K, V> item) { - return item.entrySet().containsAll(map.entrySet()); + for(K key: map.keySet()) { + if (key.equals(PcapOptions.HADOOP_CONF.getKey())) { + Configuration itemConfiguration = (Configuration) item.get(PcapOptions.HADOOP_CONF.getKey()); + Map<String, Object> mapConfiguration = (Map<String, Object>) map.get(PcapOptions.HADOOP_CONF.getKey()); + for(String setting: mapConfiguration.keySet()) { + if (!mapConfiguration.get(setting).equals(itemConfiguration.get(setting, ""))) { + return false; + } + } + } else { + V itemValue = item.get(key); + V mapValue = map.get(key); + if (itemValue != null ? !itemValue.equals(mapValue) : mapValue != null) { + return false; + } + } + } + return true; } @Override @@ -192,7 +217,8 @@ public class PcapCliTest { "-include_reverse", "-num_reducers", "10", "-records_per_file", "1000", - "-ps" + "-ps", + "-yq", "pcap" }; Map<String, String> query = new HashMap<String, String>() {{ put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1"); @@ -215,6 +241,9 @@ public class PcapCliTest { PcapOptions.END_TIME_MS.put(config, endAsNanos / 1000000L); // needed bc defaults in config PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000); PcapOptions.PRINT_JOB_STATUS.put(config, true); + PcapOptions.HADOOP_CONF.put(config, new HashMap<String, Object>() {{ + put(MRJobConfig.QUEUE_NAME, "pcap"); + }}); when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner); http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java index cbb8170..4a08e14 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java @@ -25,6 +25,7 @@ import org.apache.metron.common.configuration.ConfigOption; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.HashMap; +import java.util.Optional; import java.util.function.Function; public class PcapConfig extends AbstractMapDecorator<String, Object>{ @@ -32,6 +33,7 @@ public class PcapConfig extends AbstractMapDecorator<String, Object>{ private boolean showHelp; private DateFormat dateFormat; + private String yarnQueue; public PcapConfig() { super(new HashMap<>()); @@ -137,4 +139,12 @@ public class PcapConfig extends AbstractMapDecorator<String, Object>{ public void setNumRecordsPerFile(int numRecordsPerFile) { PcapOptions.NUM_RECORDS_PER_FILE.put(this, numRecordsPerFile); } + + public void setYarnQueue(String yarnQueue) { + this.yarnQueue = yarnQueue; + } + + public Optional<String> getYarnQueue() { + return Optional.ofNullable(yarnQueue); + } }