http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/metron-job_state_statechart_diagram.svg ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/metron-job_state_statechart_diagram.svg b/metron-platform/metron-job/metron-job_state_statechart_diagram.svg new file mode 100644 index 0000000..a99c5ad --- /dev/null +++ b/metron-platform/metron-job/metron-job_state_statechart_diagram.svg @@ -0,0 +1,14 @@ +<!-- + Licensed to the Apache Software + Foundation (ASF) under one or more contributor license agreements. See the + NOTICE file distributed with this work for additional information regarding + copyright ownership. The ASF licenses this file to You under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + --> +<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"> +<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="661px" height="291px" version="1.1" style="background-color: rgb(255, 255, 255);"><defs/><g transform="translate(0.5,0.5)"><ellipse cx="15" cy="30" rx="11" ry="11" fill="#000000" stroke="#ff0000" transform="rotate(90,15,30)" pointer-events="none"/><rect x="110" y="0" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(123.5,23.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="92" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 92px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-dec oration:inherit;">NOT_RUNNING</div></div></foreignObject><text x="46" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">NOT_RUNNING</text></switch></g><path d="M 440 30 L 537.76 30" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 529.88 34.5 L 538.88 30 L 529.88 25.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><rect x="320" y="0" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(350.5,23.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="58" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 60px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://ww w.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">RUNNING</div></div></foreignObject><text x="29" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">RUNNING</text></switch></g><rect x="540" y="0" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(563.5,23.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="72" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 74px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">FINALIZING</div></div></foreignObject><text x="36" y="12" fill="#0000 00" text-anchor="middle" font-size="12px" font-family="Verdana">FINALIZING</text></switch></g><path d="M 30 30 L 107.76 30" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 99.88 34.5 L 108.88 30 L 99.88 25.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 230 30 L 317.76 30" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 309.88 34.5 L 318.88 30 L 309.88 25.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><rect x="320" y="120" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(357.5,143.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="44" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 44px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">KILLED</div></div></foreignObject><text x="22" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">KILLED</text></switch></g><rect x="320" y="230" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(357.5,253.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="44" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 44px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">FAILED</div></div></foreignObject><text x="22" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">FAILED</text></switch></g><rect x="540" y="120" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(562.5,143.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="75" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 76px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">SUCCEEDED</div></div></foreignObject><text x="38" y="12" fill="#000000" text-anchor="mid dle" font-size="12px" font-family="Verdana">SUCCEEDED</text></switch></g><path d="M 380 60 L 380 117.76" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 375.5 109.88 L 380 118.88 L 384.5 109.88" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 600 60 L 600 117.76" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 595.5 109.88 L 600 118.88 L 604.5 109.88" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 540 45 L 500 45 Q 490 45 490 55 L 490 250 Q 490 260 480 260 L 442.24 260" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 450.12 255.5 L 441.12 260 L 450.12 264.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 320 45 L 290 45 Q 280 45 280 55 L 280 250 Q 280 260 290 260 L 317.76 260" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 309.88 264.5 L 318.88 260 L 309.88 255.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 540 45 L 500 45 Q 490 45 490 55 L 490 140 Q 490 150 480 150 L 442.24 150" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 450.12 145.5 L 441.12 150 L 450.12 154.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/></g></svg>
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/metron-job_state_statechart_diagram.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/metron-job_state_statechart_diagram.xml b/metron-platform/metron-job/metron-job_state_statechart_diagram.xml new file mode 100644 index 0000000..b9ee8aa --- /dev/null +++ b/metron-platform/metron-job/metron-job_state_statechart_diagram.xml @@ -0,0 +1,14 @@ +<!-- + Licensed to the Apache Software + Foundation (ASF) under one or more contributor license agreements. See the + NOTICE file distributed with this work for additional information regarding + copyright ownership. The ASF licenses this file to You under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + --> +<!-- This is a draw.io diagram. You can load it from http://www.draw.io --> +<mxfile userAgent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36" version="8.9.5" editor="www.draw.io" type="google"><diagram id="58cdce13-f638-feb5-8d6f-7d28b1aa9fa0" name="Page-1">7VrbctowEP0aHtvxTdg8BkJSpintlKZN+tJRbGFrIixGiAL9+kq2fAfiOhjIFF6wdqWVtOfsauVxxxzM1rcMzoNP1EOkY2jeumNedwzD0GxN/EnJJpb0eiAW+Ax7sUjPBBP8BymhGucvsYcWhY6cUsLxvCh0aRgilxdkkDG6KnabUlKcdQ59VBFMXEiq0h/Y40EsdYCWyT8g7AfJzLqmNE/QffYZXYZqvo5hTqNfrJ7BxJbqvwigR1c5kTnsmANGKY+fZusBItK3idvicTc7tOm6GQp5rQGOY1sAOK6NtCdNs98pC78hWaJkC9FC+SZxjrAgcBCNfsBnRMh08Sg2Mpf6BYeMTzjkUj/FhAwooSwaaGrRT3bmjD6jnGY6VZrIdcjLjMbekSqXzrCrngl8QqSf+jqxFNIwmpaG/AbOMJHs+46YB0OoxIpquqHa21YHCfZDIXOFD5FQ9j3MBMkwlcIFXUoI+8pNiHG03ul7PUVURAqiM8TZRnRRAxzFARUjaXuVMc5UoiBHtkQGFcf91HCGs3hQUNeE3ajAPv787dfX+/F4NL6tMKCI0irAHE3m0JXalUgIRWZA5iqvG1aVEzI23P2cOBMWHABxvVeCvAsqkOvGFsy7bWBuViPbE1lQNSnjAfVpCMkwk+ZxRWvMH3LPjxKV90C2QrG0BwVS1Mh0+zFDoXcl07ecf47CWKLQcvaSpCbaCaoETXmKqdx2AVER5MxFO9xmqfMI Mh/xHX1AbWYwRCDHv4vzHxRmqxLal7A+bFhb2jmFNajgfTMaX92Nfl4gPxzkXeOcIO++LpO//Zyc3BH25WTjjHKyfTl6m8Bs1IDZOjzMaugXioXFNAWYpWLOLBfm8VbUqPwd7AVDhlYyFO+3YihiXbqfZkR0KofFx9Hd3fD6clC0VBuYzikPil61NrgaXeBuD26rd0q4k9dpObwn94PBcHh9gby1UvC0Ea7rFWAblRaqnFDFhV4oLbJK47EWaG+htKhzq3eOVVoAs0QpCzQrLbovGWqxtNC3vDn+Jya+Gb7tjpkmTAQ1mJgk9vapaNslBoGmVLR729PkMaj42ledCRX1zpnfqlIqFm6GNmiRi71jUbFbuXA1pWLZUO+IVLQOlRX/MyrWOaCPR8XyuWo0vPuXT3pQ5vQOKgok4SbXbS47LHYv2OyWFqxre9dV7g8MUIqEeAWHiwtwSdGtpeijFa4nj4uXqSia2ecVcffsGxZz+Bc=</diagram></mxfile> http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/Finalizer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Finalizer.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Finalizer.java new file mode 100644 index 0000000..5001cfa --- /dev/null +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Finalizer.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.job; + +import java.util.Map; + +/** + * Finalize a job. + * + * @param <PAGE_T> Type for the Pageable. + */ +public interface Finalizer<PAGE_T> { + + /** + * Run any routines for finalizing a job. + * + * @param config options to be used by the finalization process. + * @return Pageable results. + */ + Pageable<PAGE_T> finalizeJob(Map<String, Object> config) throws JobException; + +} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobException.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobException.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobException.java new file mode 100644 index 0000000..10096cd --- /dev/null +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobException.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.job; + +public class JobException extends Exception { + + public JobException(String message) { + super(message); + } + + public JobException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java index ec006fb..5a2f485 100644 --- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java @@ -18,8 +18,6 @@ package org.apache.metron.job; -import org.apache.hadoop.fs.Path; - /** * Capture metadata about a batch job. */ @@ -29,6 +27,7 @@ public class JobStatus { NOT_RUNNING, RUNNING, SUCCEEDED, + FINALIZING, FAILED, KILLED } @@ -37,7 +36,7 @@ public class JobStatus { private State state = State.NOT_RUNNING; private double percentComplete = 0.0; private String description; - private Path resultPath; + private long completionTime; public JobStatus withJobId(String jobId) { this.jobId = jobId; @@ -59,11 +58,15 @@ public class JobStatus { return this; } - public JobStatus withResultPath(Path resultPath) { - this.resultPath = resultPath; + public JobStatus withCompletionTime(long completionTime) { + this.completionTime = completionTime; return this; } + public String getJobId() { + return jobId; + } + public State getState() { return state; } @@ -76,8 +79,8 @@ public class JobStatus { return description; } - public Path getResultPath() { - return resultPath; + public long getCompletionTime() { + return completionTime; } } http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java index 1038ab8..d93c7de 100644 --- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java @@ -18,14 +18,7 @@ package org.apache.metron.job; -public interface Pageable<T> { - - /** - * Transform into an Iterable. - * - * @return Iterable version of this Pageable. - */ - Iterable<T> asIterable(); +public interface Pageable<T> extends Iterable<T> { /** * Provides access to a specific page of results in the result set. @@ -35,4 +28,11 @@ public interface Pageable<T> { */ T getPage(int num); + /** + * Number of pages i this Pageable. + * + * @return number of pages + */ + int getSize(); + } http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java index 7a8fc02..9bdea35 100644 --- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java @@ -18,20 +18,44 @@ package org.apache.metron.job; -import java.io.IOException; import java.util.Map; /** * Abstraction for getting status on running jobs. Also provides options for killing and validating. */ -public interface Statusable { +public interface Statusable<PAGE_T> { + + enum JobType { + MAP_REDUCE; + } + + /** + * Submit the job asynchronously. + * + * @return self + */ + Statusable<PAGE_T> submit(Finalizer<PAGE_T> finalizer, Map<String, Object> configuration) throws JobException; + + /** + * Synchronous call. + * + * @return pages of results + */ + Pageable<PAGE_T> get() throws JobException, InterruptedException; + + /** + * Execution framework type of this job. + * + * @return type of job + */ + JobType getJobType(); /** * Current job status. * * @return status */ - JobStatus getStatus(); + JobStatus getStatus() throws JobException; /** * Completion flag. @@ -43,7 +67,7 @@ public interface Statusable { /** * Kill job. */ - void kill() throws IOException; + void kill() throws JobException; /** * Validate job after submitted. http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java new file mode 100644 index 0000000..bf0baa7 --- /dev/null +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.job.manager; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.metron.job.JobException; +import org.apache.metron.job.JobStatus; +import org.apache.metron.job.Statusable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InMemoryJobManager<PAGE_T> implements JobManager<PAGE_T> { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private Map<String, Map<String, Statusable<PAGE_T>>> jobs; + + public InMemoryJobManager() { + this.jobs = Collections.synchronizedMap(new HashMap<>()); + } + + @Override + public JobStatus submit(Supplier<Statusable<PAGE_T>> jobSupplier, String username) + throws JobException { + Map<String, Statusable<PAGE_T>> userJobs = getUserJobs(username); + Statusable<PAGE_T> job = jobSupplier.get(); + userJobs.put(job.getStatus().getJobId(), job); + jobs.put(username, userJobs); + return job.getStatus(); + } + + @Override + public JobStatus getStatus(String username, String jobId) throws JobException { + return jobs.get(username).get(jobId).getStatus(); + } + + @Override + public boolean done(String username, String jobId) throws JobException { + return getJob(username, jobId).isDone(); + } + + @Override + public void killJob(String username, String jobId) throws JobException { + getJob(username, jobId).kill(); + } + + @Override + public Statusable<PAGE_T> getJob(String username, String jobId) throws JobException { + return getUserJobs(username).get(jobId); + } + + private Map<String, Statusable<PAGE_T>> getUserJobs(String username) { + return jobs.getOrDefault(username, Collections.synchronizedMap(new HashMap<>())); + } + + @Override + public List<Statusable<PAGE_T>> getJobs(String username) throws JobException { + return new ArrayList<Statusable<PAGE_T>>(getUserJobs(username).values()); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/JobManager.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/JobManager.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/JobManager.java new file mode 100644 index 0000000..eff60e5 --- /dev/null +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/JobManager.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.job.manager; + +import java.util.List; +import java.util.function.Supplier; +import org.apache.metron.job.JobException; +import org.apache.metron.job.JobStatus; +import org.apache.metron.job.Statusable; + +public interface JobManager<PAGE_T> { + + JobStatus submit(Supplier<Statusable<PAGE_T>> jobSupplier, String username) throws JobException; + + JobStatus getStatus(String username, String jobId) throws JobException; + + boolean done(String username, String jobId) throws JobException; + + void killJob(String username, String jobId) throws JobException; + + Statusable<PAGE_T> getJob(String username, String jobId) throws JobException; + + List<Statusable<PAGE_T>> getJobs(String username) throws JobException; + +} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java b/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java new file mode 100644 index 0000000..f3a3978 --- /dev/null +++ b/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.job.manager; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.hadoop.fs.Path; +import org.apache.metron.job.Finalizer; +import org.apache.metron.job.JobException; +import org.apache.metron.job.JobStatus; +import org.apache.metron.job.JobStatus.State; +import org.apache.metron.job.Pageable; +import org.apache.metron.job.Statusable; +import org.apache.metron.job.Statusable.JobType; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class InMemoryJobManagerTest { + + @Rule + public TemporaryFolder tempDir = new TemporaryFolder(); + @Mock + private Statusable<Path> job1; + @Mock + private Statusable<Path> job2; + @Mock + private Statusable<Path> job3; + @Mock + private Finalizer<Path> finalizer; + @Mock + private Pageable<Path> results; + private JobManager<Path> jm; + private Map<String, Object> config; + private String username1; + private String username2; + private String jobId1; + private String jobId2; + private String jobId3; + private String basePath; + + @Before + public void setup() throws JobException { + MockitoAnnotations.initMocks(this); + jm = new InMemoryJobManager<Path>(); + config = new HashMap<>(); + username1 = "user123"; + username2 = "user456"; + jobId1 = "job_abc_123"; + jobId2 = "job_def_456"; + jobId3 = "job_ghi_789"; + basePath = tempDir.getRoot().getAbsolutePath(); + when(job1.getJobType()).thenReturn(JobType.MAP_REDUCE); + when(job2.getJobType()).thenReturn(JobType.MAP_REDUCE); + when(job3.getJobType()).thenReturn(JobType.MAP_REDUCE); + when(job1.submit(finalizer, config)).thenReturn(job1); + when(job2.submit(finalizer, config)).thenReturn(job2); + when(job3.submit(finalizer, config)).thenReturn(job3); + when(finalizer.finalizeJob(any())).thenReturn(results); + } + + @Test + public void submits_job_and_returns_status() throws JobException { + when(job1.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId1)); + JobStatus status = jm.submit(newSupplier(job1), username1); + assertThat(status.getState(), equalTo(State.RUNNING)); + assertThat(status.getJobId(), equalTo(jobId1)); + when(job1.getStatus()).thenReturn(new JobStatus().withState(State.SUCCEEDED).withJobId(jobId1)); + status = jm.getStatus(username1, status.getJobId()); + assertThat(status.getState(), equalTo(State.SUCCEEDED)); + assertThat(status.getJobId(), equalTo(jobId1)); + } + + @Test + public void submits_multiple_jobs_and_returns_status() throws JobException { + when(job1.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId1)); + when(job2.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId2)); + when(job3.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId3)); + + // user has 1 job + jm.submit(newSupplier(job1), username1); + assertThat(jm.getJob(username1, jobId1), equalTo(job1)); + + // user has 2 jobs + jm.submit(newSupplier(job2), username1); + assertThat(jm.getJob(username1, jobId1), equalTo(job1)); + assertThat(jm.getJob(username1, jobId2), equalTo(job2)); + + // user has 3 jobs + jm.submit(newSupplier(job3), username1); + assertThat(jm.getJob(username1, jobId1), equalTo(job1)); + assertThat(jm.getJob(username1, jobId2), equalTo(job2)); + assertThat(jm.getJob(username1, jobId3), equalTo(job3)); + + // multiple users have 3 jobs + jm.submit(newSupplier(job1), username2); + jm.submit(newSupplier(job2), username2); + jm.submit(newSupplier(job3), username2); + // user 1 still good + assertThat(jm.getJob(username1, jobId1), equalTo(job1)); + assertThat(jm.getJob(username1, jobId2), equalTo(job2)); + assertThat(jm.getJob(username1, jobId3), equalTo(job3)); + // and also user 2 + assertThat(jm.getJob(username2, jobId1), equalTo(job1)); + assertThat(jm.getJob(username2, jobId2), equalTo(job2)); + assertThat(jm.getJob(username2, jobId3), equalTo(job3)); + } + + @Test + public void returns_job_status() throws JobException { + JobStatus expected = new JobStatus().withState(State.SUCCEEDED).withJobId(jobId1); + when(job1.getStatus()).thenReturn(expected); + jm.submit(newSupplier(job1), username1); + JobStatus status = jm.getStatus(username1, jobId1); + assertThat(status, equalTo(expected)); + } + + @Test + public void returns_job_is_done() throws JobException { + JobStatus expected = new JobStatus().withState(State.SUCCEEDED).withJobId(jobId1); + when(job1.getStatus()).thenReturn(expected); + when(job1.isDone()).thenReturn(true); + jm.submit(newSupplier(job1), username1); + boolean done = jm.done(username1, jobId1); + assertThat(done, equalTo(true)); + } + + @Test + public void kills_job() throws JobException { + when(job1.getStatus()).thenReturn(new JobStatus().withState(State.SUCCEEDED).withJobId(jobId1)); + jm.submit(newSupplier(job1), username1); + jm.killJob(username1, jobId1); + verify(job1).kill(); + } + + @Test + public void gets_list_of_user_jobs() throws JobException { + when(job1.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId1)); + when(job2.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId2)); + when(job3.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId3)); + jm.submit(newSupplier(job1), username1); + jm.submit(newSupplier(job2), username1); + jm.submit(newSupplier(job3), username1); + jm.submit(newSupplier(job1), username2); + jm.submit(newSupplier(job2), username2); + jm.submit(newSupplier(job3), username2); + List<Statusable<Path>> jobsUser1 = jm.getJobs(username1); + List<Statusable<Path>> jobsUser2 = jm.getJobs(username2); + assertThat("Wrong size", jobsUser1.size(), equalTo(3)); + assertThat("Wrong size", jobsUser2.size(), equalTo(3)); + assertThat("", jobsUser1.containsAll(Arrays.asList(job1, job2, job3)), equalTo(true)); + assertThat("", jobsUser2.containsAll(Arrays.asList(job1, job2, job3)), equalTo(true)); + } + + private Supplier<Statusable<Path>> newSupplier(Statusable<Path> job) { + return () -> { + try { + return job.submit(finalizer, config); + } catch (JobException e) { + throw new RuntimeException("Something went wrong", e); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/pom.xml b/metron-platform/metron-pcap-backend/pom.xml index c3b1a69..fb523ee 100644 --- a/metron-platform/metron-pcap-backend/pom.xml +++ b/metron-platform/metron-pcap-backend/pom.xml @@ -218,6 +218,12 @@ <version>${project.parent.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java deleted file mode 100644 index 1d8e3f3..0000000 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.metron.pcap.query; - -import org.apache.commons.lang3.StringUtils; -import org.apache.metron.common.system.Clock; - -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.UUID; -import java.util.function.Consumer; -import java.util.function.Function; - -public class CliConfig { - public interface PrefixStrategy extends Function<Clock, String>{} - - private boolean showHelp; - private String prefix; - private String basePath; - private String baseOutputPath; - private long startTime; - private long endTime; - private int numReducers; - private int numRecordsPerFile; - private DateFormat dateFormat; - - - public CliConfig(PrefixStrategy prefixStrategy) { - showHelp = false; - basePath = ""; - baseOutputPath = ""; - startTime = -1L; - endTime = -1L; - numReducers = 0; - prefix = prefixStrategy.apply(new Clock()); - } - - public String getPrefix() { - return prefix; - } - - public void setPrefix(String prefix) { - this.prefix = prefix; - } - - public int getNumReducers() { - return numReducers; - } - - public boolean showHelp() { - return showHelp; - } - - public void setShowHelp(boolean showHelp) { - this.showHelp = showHelp; - } - - public String getBasePath() { - return basePath; - } - - public String getBaseOutputPath() { - return baseOutputPath; - } - - public long getStartTime() { - return startTime; - } - - public long getEndTime() { - return endTime; - } - - public void setBasePath(String basePath) { - this.basePath = basePath; - } - - public void setBaseOutputPath(String baseOutputPath) { - this.baseOutputPath = baseOutputPath; - } - - public void setStartTime(long startTime) { - this.startTime = startTime; - } - - public void setEndTime(long endTime) { - this.endTime = endTime; - } - - public boolean isNullOrEmpty(String val) { - return StringUtils.isEmpty(val); - } - - public void setDateFormat(String dateFormat) { - this.dateFormat = new SimpleDateFormat(dateFormat); - } - - public DateFormat getDateFormat() { - return dateFormat; - } - - public void setNumReducers(int numReducers) { - this.numReducers = numReducers; - } - - public int getNumRecordsPerFile() { - return numRecordsPerFile; - } - - public void setNumRecordsPerFile(int numRecordsPerFile) { - this.numRecordsPerFile = numRecordsPerFile; - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java index d5976ae..e6534c5 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java @@ -19,19 +19,20 @@ package org.apache.metron.pcap.query; import org.apache.commons.cli.*; +import org.apache.metron.pcap.config.PcapConfig; /** * Provides commmon required fields for the PCAP filter jobs */ public class CliParser { public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap"; - public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp"; + public static final String BASE_INTERIM_OUTPUT_PATH_DEFAULT = "/tmp"; public static final int NUM_REDUCERS_DEFAULT = 10; public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000; private CommandLineParser parser; - protected CliConfig.PrefixStrategy prefixStrategy; + protected PcapConfig.PrefixStrategy prefixStrategy; - public CliParser(CliConfig.PrefixStrategy prefixStrategy) { + public CliParser(PcapConfig.PrefixStrategy prefixStrategy) { this.prefixStrategy = prefixStrategy; parser = new PosixParser(); } @@ -40,7 +41,8 @@ public class CliParser { Options options = new Options(); options.addOption(newOption("h", "help", false, "Display help")); options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", BASE_PATH_DEFAULT))); - options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", BASE_OUTPUT_PATH_DEFAULT))); + options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", + BASE_INTERIM_OUTPUT_PATH_DEFAULT))); options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true)); options.addOption(newOption("nr", "num_reducers", true, String.format("Number of reducers to use (defaults to %s)", NUM_REDUCERS_DEFAULT))); options.addOption(newOption("rpf", "records_per_file", true, String.format("Number of records to include in each output pcap file (defaults to %s)", NUM_RECORDS_PER_FILE_DEFAULT))); @@ -59,7 +61,7 @@ public class CliParser { return option; } - public void parse(CommandLine commandLine, CliConfig config) throws java.text.ParseException { + public void parse(CommandLine commandLine, PcapConfig config) throws java.text.ParseException { if (commandLine.hasOption("help")) { config.setShowHelp(true); } @@ -72,18 +74,18 @@ public class CliParser { config.setBasePath(BASE_PATH_DEFAULT); } if (commandLine.hasOption("base_output_path")) { - config.setBaseOutputPath(commandLine.getOptionValue("base_output_path")); + config.setBaseInterimResultPath(commandLine.getOptionValue("base_output_path")); } else { - config.setBaseOutputPath(BASE_OUTPUT_PATH_DEFAULT); + config.setBaseInterimResultPath(BASE_INTERIM_OUTPUT_PATH_DEFAULT); } if (commandLine.hasOption("start_time")) { try { if (commandLine.hasOption("date_format")) { long startTime = config.getDateFormat().parse(commandLine.getOptionValue("start_time")).getTime(); - config.setStartTime(startTime); + config.setStartTimeMs(startTime); } else { long startTime = Long.parseLong(commandLine.getOptionValue("start_time")); - config.setStartTime(startTime); + config.setStartTimeMs(startTime); } } catch (NumberFormatException nfe) { //no-op @@ -107,10 +109,10 @@ public class CliParser { try { if (commandLine.hasOption("date_format")) { long endTime = config.getDateFormat().parse(commandLine.getOptionValue("end_time")).getTime(); - config.setEndTime(endTime); + config.setEndTimeMs(endTime); } else { long endTime = Long.parseLong(commandLine.getOptionValue("end_time")); - config.setEndTime(endTime); + config.setEndTimeMs(endTime); } } catch (NumberFormatException nfe) { //no-op http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java deleted file mode 100644 index 03caed7..0000000 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.metron.pcap.query; - -import org.apache.metron.common.Constants; - -import java.util.EnumMap; -import java.util.LinkedHashMap; -import java.util.Map; - -public class FixedCliConfig extends CliConfig { - - private Map<String, String> fixedFields; - - public FixedCliConfig(PrefixStrategy prefixStrategy) { - super(prefixStrategy); - this.fixedFields = new LinkedHashMap<>(); - } - - public Map<String, String> getFixedFields() { - return fixedFields; - } - - public void setFixedFields(Map<String, String> fixedFields) { - this.fixedFields = fixedFields; - } - - public void putFixedField(String key, String value) { - String trimmedVal = value != null ? value.trim() : null; - if (!isNullOrEmpty(trimmedVal)) { - this.fixedFields.put(key, value); - } - } - -} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java index 4e1bfcf..19d351c 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java @@ -22,11 +22,13 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.metron.common.Constants; import org.apache.metron.pcap.PcapHelper; +import org.apache.metron.pcap.config.FixedPcapConfig; +import org.apache.metron.pcap.config.PcapConfig; public class FixedCliParser extends CliParser { private Options fixedOptions; - public FixedCliParser(CliConfig.PrefixStrategy prefixStrategy) { + public FixedCliParser(PcapConfig.PrefixStrategy prefixStrategy) { super(prefixStrategy); fixedOptions = buildFixedOptions(); } @@ -51,9 +53,9 @@ public class FixedCliParser extends CliParser { * @return Configuration tailored to fixed pcap queries * @throws ParseException */ - public FixedCliConfig parse(String[] args) throws ParseException, java.text.ParseException { + public FixedPcapConfig parse(String[] args) throws ParseException, java.text.ParseException { CommandLine commandLine = getParser().parse(fixedOptions, args); - FixedCliConfig config = new FixedCliConfig(prefixStrategy); + FixedPcapConfig config = new FixedPcapConfig(prefixStrategy); super.parse(commandLine, config); config.putFixedField(Constants.Fields.SRC_ADDR.getName(), commandLine.getOptionValue("ip_src_addr")); config.putFixedField(Constants.Fields.DST_ADDR.getName(), commandLine.getOptionValue("ip_dst_addr")); @@ -63,7 +65,7 @@ public class FixedCliParser extends CliParser { config.putFixedField(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), Boolean.toString(commandLine.hasOption("include_reverse"))); config.putFixedField(PcapHelper.PacketFields.PACKET_FILTER.getName(), commandLine.getOptionValue("packet_filter")); if(commandLine.hasOption("prefix")) { - config.setPrefix(commandLine.getOptionValue("prefix")); + config.setFinalFilenamePrefix(commandLine.getOptionValue("prefix")); } return config; } http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java index 0fda801..3462921 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java @@ -27,45 +27,46 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.metron.common.hadoop.SequenceFileIterable; -import org.apache.metron.common.system.Clock; import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.apache.metron.job.JobException; +import org.apache.metron.job.Pageable; +import org.apache.metron.pcap.config.PcapOptions; +import org.apache.metron.pcap.config.FixedPcapConfig; +import org.apache.metron.pcap.config.PcapConfig; +import org.apache.metron.pcap.config.QueryPcapConfig; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; -import org.apache.metron.pcap.filter.query.QueryPcapFilter; +import org.apache.metron.pcap.finalizer.PcapFinalizerStrategies; import org.apache.metron.pcap.mr.PcapJob; -import org.apache.metron.pcap.writer.ResultsWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PcapCli { private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - public static final CliConfig.PrefixStrategy PREFIX_STRATEGY = clock -> { - String timestamp = new Clock().currentTimeFormatted("yyyyMMddHHmm"); + public static final PcapConfig.PrefixStrategy PREFIX_STRATEGY = clock -> { + String timestamp = clock.currentTimeFormatted("yyyyMMddHHmm"); String uuid = UUID.randomUUID().toString().replaceAll("-", ""); return String.format("%s-%s", timestamp, uuid); }; private final PcapJob jobRunner; - private final ResultsWriter resultsWriter; - private final CliConfig.PrefixStrategy prefixStrategy; + private final PcapConfig.PrefixStrategy prefixStrategy; public static void main(String[] args) { - int status = new PcapCli(new PcapJob(), new ResultsWriter(), PREFIX_STRATEGY).run(args); + int status = new PcapCli(new PcapJob(), PREFIX_STRATEGY).run(args); System.exit(status); } - public PcapCli(PcapJob jobRunner, ResultsWriter resultsWriter, CliConfig.PrefixStrategy prefixStrategy) { + public PcapCli(PcapJob jobRunner, PcapConfig.PrefixStrategy prefixStrategy) { this.jobRunner = jobRunner; - this.resultsWriter = resultsWriter; this.prefixStrategy = prefixStrategy; } + public int run(String[] args) { if (args.length < 1) { printBasicHelp(); return -1; } String jobType = args[0]; - SequenceFileIterable results = null; String[] commandArgs = Arrays.copyOfRange(args, 1, args.length); Configuration hadoopConf = new Configuration(); String[] otherArgs = null; @@ -75,13 +76,18 @@ public class PcapCli { LOGGER.error("Failed to configure hadoop with provided options: {}", e.getMessage(), e); return -1; } - CliConfig commonConfig = null; + PcapConfig commonConfig = null; + Pageable<Path> results; + // write to local FS in the executing directory + String execDir = System.getProperty("user.dir"); + if ("fixed".equals(jobType)) { FixedCliParser fixedParser = new FixedCliParser(prefixStrategy); - FixedCliConfig config = null; + FixedPcapConfig config = null; try { config = fixedParser.parse(otherArgs); commonConfig = config; + PcapOptions.FINAL_OUTPUT_PATH.put(commonConfig, new Path("file:///" + execDir)); } catch (ParseException | java.text.ParseException e) { System.err.println(e.getMessage()); System.err.flush(); @@ -92,28 +98,24 @@ public class PcapCli { fixedParser.printHelp(); return 0; } - Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTime(), config.getEndTime()); + Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTimeMs(), config.getEndTimeMs()); long startTime = time.getLeft(); long endTime = time.getRight(); + PcapOptions.START_TIME_NS.put(commonConfig, startTime); + PcapOptions.END_TIME_NS.put(commonConfig, endTime); + PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator()); + PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf); try { - results = jobRunner.query( - new Path(config.getBasePath()), - new Path(config.getBaseOutputPath()), - startTime, - endTime, - config.getNumReducers(), - config.getFixedFields(), - hadoopConf, - FileSystem.get(hadoopConf), - new FixedPcapFilter.Configurator()); - } catch (IOException | ClassNotFoundException | InterruptedException e) { + PcapOptions.FILESYSTEM.put(commonConfig, FileSystem.get(hadoopConf)); + results = jobRunner.submit(PcapFinalizerStrategies.CLI, commonConfig).get(); + } catch (IOException|InterruptedException | JobException e) { LOGGER.error("Failed to execute fixed filter job: {}", e.getMessage(), e); return -1; } } else if ("query".equals(jobType)) { QueryCliParser queryParser = new QueryCliParser(prefixStrategy); - QueryCliConfig config = null; + QueryPcapConfig config = null; try { config = queryParser.parse(otherArgs); commonConfig = config; @@ -126,23 +128,19 @@ public class PcapCli { queryParser.printHelp(); return 0; } - Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTime(), config.getEndTime()); + Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTimeMs(), config.getEndTimeMs()); long startTime = time.getLeft(); long endTime = time.getRight(); + PcapOptions.START_TIME_NS.put(commonConfig, startTime); + PcapOptions.END_TIME_NS.put(commonConfig, endTime); + PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator()); + PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf); try { - results = jobRunner.query( - new Path(config.getBasePath()), - new Path(config.getBaseOutputPath()), - startTime, - endTime, - config.getNumReducers(), - config.getQuery(), - hadoopConf, - FileSystem.get(hadoopConf), - new QueryPcapFilter.Configurator()); - } catch (IOException | ClassNotFoundException | InterruptedException e) { - LOGGER.error("Failed to execute query filter job: {}", e.getMessage(), e); + PcapOptions.FILESYSTEM.put(commonConfig, FileSystem.get(hadoopConf)); + results = jobRunner.submit(PcapFinalizerStrategies.CLI, commonConfig).get(); + } catch (IOException| InterruptedException | JobException e) { + LOGGER.error("Failed to execute fixed filter job: {}", e.getMessage(), e); return -1; } } else { @@ -150,17 +148,6 @@ public class PcapCli { return -1; } - try { - // write to local FS in the executing directory - String execDir = System.getProperty("user.dir"); - jobRunner.writeResults(results, resultsWriter, new Path("file:///" + execDir), - commonConfig.getNumRecordsPerFile(), - commonConfig.getPrefix()); - } catch (IOException e) { - LOGGER.error("Unable to write file", e); - return -1; - } - return 0; } http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java deleted file mode 100644 index 67f045f..0000000 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.metron.pcap.query; - -public class QueryCliConfig extends CliConfig { - private String query; - - public QueryCliConfig(PrefixStrategy prefixStrategy) { - super(prefixStrategy); - } - - public String getQuery() { - return query; - } - - public void setQuery(String query) { - this.query = query; - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java index d6e5cd1..b4375d1 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java @@ -20,11 +20,13 @@ package org.apache.metron.pcap.query; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.metron.pcap.config.PcapConfig; +import org.apache.metron.pcap.config.QueryPcapConfig; public class QueryCliParser extends CliParser { private Options queryOptions; - public QueryCliParser(CliConfig.PrefixStrategy prefixStrategy) { + public QueryCliParser(PcapConfig.PrefixStrategy prefixStrategy) { super(prefixStrategy); queryOptions = setupOptions(); } @@ -43,15 +45,15 @@ public class QueryCliParser extends CliParser { * @return Configuration tailored to query pcap queries * @throws ParseException */ - public QueryCliConfig parse(String[] args) throws ParseException, java.text.ParseException { + public QueryPcapConfig parse(String[] args) throws ParseException, java.text.ParseException { CommandLine commandLine = getParser().parse(queryOptions, args); - QueryCliConfig config = new QueryCliConfig(prefixStrategy); + QueryPcapConfig config = new QueryPcapConfig(prefixStrategy); super.parse(commandLine, config); if (commandLine.hasOption("query")) { config.setQuery(commandLine.getOptionValue("query")); } if(commandLine.hasOption("prefix")) { - config.setPrefix(commandLine.getOptionValue("prefix")); + config.setFinalFilenamePrefix(commandLine.getOptionValue("prefix")); } return config; } http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java index 5a5d406..1e389d9 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java @@ -19,15 +19,12 @@ package org.apache.metron.pcap; import static java.lang.Long.toUnsignedString; -import static java.lang.String.format; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.startsWith; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -39,9 +36,13 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.apache.metron.job.Finalizer; import org.apache.metron.job.JobStatus; import org.apache.metron.job.JobStatus.State; +import org.apache.metron.job.Pageable; import org.apache.metron.job.Statusable; +import org.apache.metron.pcap.config.FixedPcapConfig; +import org.apache.metron.pcap.config.PcapOptions; import org.apache.metron.pcap.filter.PcapFilterConfigurator; import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.pcap.mr.PcapJob; @@ -54,33 +55,62 @@ import org.mockito.MockitoAnnotations; public class PcapJobTest { @Mock - private Job job; + private Job mrJob; @Mock private org.apache.hadoop.mapreduce.JobStatus mrStatus; @Mock private JobID jobId; - private static final String JOB_ID_VAL = "job_abc_123"; + @Mock + private Finalizer<Path> finalizer; + private Pageable<Path> pageableResult; + private FixedPcapConfig config; + private Configuration hadoopConfig; + private FileSystem fileSystem; + private String jobIdVal = "job_abc_123"; private Path basePath; private Path baseOutPath; private long startTime; private long endTime; private int numReducers; + private int numRecordsPerFile; + private Path finalOutputPath; private Map<String, String> fixedFields; - private Configuration hadoopConfig; + private PcapJob<Map<String, String>> testJob; + @Before - public void setup() { + public void setup() throws IOException { MockitoAnnotations.initMocks(this); basePath = new Path("basepath"); baseOutPath = new Path("outpath"); startTime = 100; endTime = 200; numReducers = 5; + numRecordsPerFile = 5; fixedFields = new HashMap<>(); fixedFields.put("ip_src_addr", "192.168.1.1"); hadoopConfig = new Configuration(); - when(jobId.toString()).thenReturn(JOB_ID_VAL); + fileSystem = FileSystem.get(hadoopConfig); + finalOutputPath = new Path("finaloutpath"); + when(jobId.toString()).thenReturn(jobIdVal); when(mrStatus.getJobID()).thenReturn(jobId); + pageableResult = new PcapPages(); + // handles setting the file name prefix under the hood + config = new FixedPcapConfig(clock -> "clockprefix"); + PcapOptions.HADOOP_CONF.put(config, hadoopConfig); + PcapOptions.FILESYSTEM.put(config, FileSystem.get(hadoopConfig)); + PcapOptions.BASE_PATH.put(config, basePath); + PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, baseOutPath); + PcapOptions.START_TIME_NS.put(config, startTime); + PcapOptions.END_TIME_NS.put(config, endTime); + PcapOptions.NUM_REDUCERS.put(config, numReducers); + PcapOptions.FIELDS.put(config, fixedFields); + PcapOptions.FILTER_IMPL.put(config, new FixedPcapFilter.Configurator()); + PcapOptions.NUM_RECORDS_PER_FILE.put(config, numRecordsPerFile); + PcapOptions.FINAL_OUTPUT_PATH.put(config, finalOutputPath); + testJob = new TestJob<>(); + testJob.setStatusInterval(10); + testJob.setCompleteCheckInterval(10); } @Test @@ -98,147 +128,94 @@ public class PcapJobTest { equalTo(8)); } - private class TestJob extends PcapJob { + private class TestJob<T> extends PcapJob<T> { @Override - public <T> Job createJob(Optional<String> jobName, Path basePath, Path outputPath, long beginNS, long endNS, - int numReducers, T fields, Configuration conf, FileSystem fs, + public Job createJob(Optional<String> jobName, + Path basePath, + Path outputPath, + long beginNS, + long endNS, + int numReducers, + T fields, + Configuration conf, + FileSystem fs, PcapFilterConfigurator<T> filterImpl) throws IOException { - return job; + return mrJob; } } @Test public void job_succeeds_synchronously() throws Exception { - when(job.isComplete()).thenReturn(true); + pageableResult = new PcapPages( + Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt"))); + when(finalizer.finalizeJob(any())).thenReturn(pageableResult); + when(mrJob.isComplete()).thenReturn(true); when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); - when(job.getStatus()).thenReturn(mrStatus); - TestJob testJob = new TestJob(); - Statusable statusable = testJob.query( - Optional.empty(), - basePath, - baseOutPath, - startTime, - endTime, - numReducers, - fixedFields, - hadoopConfig, - FileSystem.get(hadoopConfig), - new FixedPcapFilter.Configurator(), - true); - verify(job, times(1)).waitForCompletion(true); + when(mrJob.getStatus()).thenReturn(mrStatus); + Statusable<Path> statusable = testJob.submit(finalizer, config); + Pageable<Path> results = statusable.get(); + Assert.assertThat(results.getSize(), equalTo(3)); JobStatus status = statusable.getStatus(); Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED)); Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); - String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); - Assert.assertThat(status.getResultPath(), notNullValue()); - Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath)); + Assert.assertThat(status.getJobId(), equalTo(jobIdVal)); } @Test public void job_fails_synchronously() throws Exception { - when(job.isComplete()).thenReturn(true); + when(mrJob.isComplete()).thenReturn(true); when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.FAILED); - when(job.getStatus()).thenReturn(mrStatus); - TestJob testJob = new TestJob(); - Statusable statusable = testJob.query( - Optional.empty(), - basePath, - baseOutPath, - startTime, - endTime, - numReducers, - fixedFields, - hadoopConfig, - FileSystem.get(hadoopConfig), - new FixedPcapFilter.Configurator(), - true); + when(mrJob.getStatus()).thenReturn(mrStatus); + Statusable<Path> statusable = testJob.submit(finalizer, config); + Pageable<Path> results = statusable.get(); JobStatus status = statusable.getStatus(); Assert.assertThat(status.getState(), equalTo(State.FAILED)); Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); - String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); - Assert.assertThat(status.getResultPath(), notNullValue()); - Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath)); + Assert.assertThat(results.getSize(), equalTo(0)); } @Test public void job_fails_with_killed_status_synchronously() throws Exception { - when(job.isComplete()).thenReturn(true); + when(mrJob.isComplete()).thenReturn(true); when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); - when(job.getStatus()).thenReturn(mrStatus); - TestJob testJob = new TestJob(); - Statusable statusable = testJob.query( - Optional.empty(), - basePath, - baseOutPath, - startTime, - endTime, - numReducers, - fixedFields, - hadoopConfig, - FileSystem.get(hadoopConfig), - new FixedPcapFilter.Configurator(), - true); + when(mrJob.getStatus()).thenReturn(mrStatus); + Statusable<Path> statusable = testJob.submit(finalizer, config); + Pageable<Path> results = statusable.get(); JobStatus status = statusable.getStatus(); Assert.assertThat(status.getState(), equalTo(State.KILLED)); Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); - String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); - Assert.assertThat(status.getResultPath(), notNullValue()); - Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath)); + Assert.assertThat(results.getSize(), equalTo(0)); } @Test public void job_succeeds_asynchronously() throws Exception { - when(job.isComplete()).thenReturn(true); + // not complete a few times to make sure cancel works as expected + when(mrJob.isComplete()).thenReturn(false, false, false, true); when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); - when(job.getStatus()).thenReturn(mrStatus); - TestJob testJob = new TestJob(); - Statusable statusable = testJob.query( - Optional.empty(), - basePath, - baseOutPath, - startTime, - endTime, - numReducers, - fixedFields, - hadoopConfig, - FileSystem.get(hadoopConfig), - new FixedPcapFilter.Configurator(), - false); + when(mrJob.getStatus()).thenReturn(mrStatus); + Statusable<Path> statusable = testJob.submit(finalizer, config); + while (!statusable.isDone()) { + } JobStatus status = statusable.getStatus(); Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED)); Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); - String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString(); - Assert.assertThat(status.getResultPath(), notNullValue()); - Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath)); } @Test public void job_reports_percent_complete() throws Exception { - when(job.isComplete()).thenReturn(false); + when(mrJob.isComplete()).thenReturn(false); when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING); - when(job.getStatus()).thenReturn(mrStatus); - TestJob testJob = new TestJob(); - Statusable statusable = testJob.query( - Optional.empty(), - basePath, - baseOutPath, - startTime, - endTime, - numReducers, - fixedFields, - hadoopConfig, - FileSystem.get(hadoopConfig), - new FixedPcapFilter.Configurator(), - false); - when(job.mapProgress()).thenReturn(0.5f); - when(job.reduceProgress()).thenReturn(0f); + when(mrJob.getStatus()).thenReturn(mrStatus); + Statusable<Path> statusable = testJob.submit(finalizer, config); + when(mrJob.mapProgress()).thenReturn(0.5f); + when(mrJob.reduceProgress()).thenReturn(0f); JobStatus status = statusable.getStatus(); Assert.assertThat(status.getState(), equalTo(State.RUNNING)); Assert.assertThat(status.getDescription(), equalTo("map: 50.0%, reduce: 0.0%")); Assert.assertThat(status.getPercentComplete(), equalTo(25.0)); - when(job.mapProgress()).thenReturn(1.0f); - when(job.reduceProgress()).thenReturn(0.5f); + when(mrJob.mapProgress()).thenReturn(1.0f); + when(mrJob.reduceProgress()).thenReturn(0.5f); status = statusable.getStatus(); Assert.assertThat(status.getPercentComplete(), equalTo(75.0)); Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 50.0%"));