Repository: incubator-reef Updated Branches: refs/heads/master fbe7118c4 -> 6c8a46083
[REEF-694] Refactor reef-examples/scheduler to separate core and HTTP logic Move all HTTP-specific logic to http package. Introduce exceptions into core logic. JIRA: [REEF-694](https://issues.apache.org/jira/browse/REEF-694) Pull Request: Closes #445 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/6c8a4608 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/6c8a4608 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/6c8a4608 Branch: refs/heads/master Commit: 6c8a460836c52197b3d700d704480e804eb0a9eb Parents: fbe7118 Author: Brian Cho <[email protected]> Authored: Mon Aug 31 12:08:54 2015 +0900 Committer: Byung-Gon Chun <[email protected]> Committed: Mon Aug 31 18:21:28 2015 +0900 ---------------------------------------------------------------------- .../scheduler/client/SchedulerREEF.java | 2 +- .../examples/scheduler/driver/Scheduler.java | 100 +++++----- .../scheduler/driver/SchedulerDriver.java | 50 ++--- .../scheduler/driver/SchedulerHttpHandler.java | 107 ---------- .../scheduler/driver/SchedulerResponse.java | 114 ----------- .../driver/exceptions/NotFoundException.java | 28 +++ .../exceptions/UnsuccessfulException.java | 28 +++ .../driver/exceptions/package-info.java | 22 +++ .../driver/http/SchedulerHttpHandler.java | 195 +++++++++++++++++++ .../driver/http/SchedulerResponse.java | 114 +++++++++++ .../scheduler/driver/http/package-info.java | 22 +++ 11 files changed, 475 insertions(+), 307 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java index 15277f1..688d339 100644 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java @@ -22,7 +22,7 @@ import org.apache.commons.cli.ParseException; import org.apache.reef.client.DriverConfiguration; import org.apache.reef.client.REEF; import org.apache.reef.examples.scheduler.driver.SchedulerDriver; -import org.apache.reef.examples.scheduler.driver.SchedulerHttpHandler; +import org.apache.reef.examples.scheduler.driver.http.SchedulerHttpHandler; import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.Configurations; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java index 6dd6f15..752d171 100644 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java @@ -22,6 +22,8 @@ import org.apache.reef.driver.context.ActiveContext; import org.apache.reef.driver.task.TaskConfiguration; import org.apache.reef.examples.library.Command; import org.apache.reef.examples.library.ShellTask; +import org.apache.reef.examples.scheduler.driver.exceptions.NotFoundException; +import org.apache.reef.examples.scheduler.driver.exceptions.UnsuccessfulException; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.Configurations; import org.apache.reef.tang.Tang; @@ -30,7 +32,9 @@ import javax.annotation.concurrent.ThreadSafe; import javax.inject.Inject; import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -87,93 +91,80 @@ final class Scheduler { /** * Update the record of task to mark it as canceled. */ - public synchronized SchedulerResponse cancelTask(final int taskId) { + public synchronized int cancelTask(final int taskId) throws UnsuccessfulException, NotFoundException { if (getTask(taskId, runningTasks) != null) { - return SchedulerResponse.forbidden("The task " + taskId + " is running"); + throw new UnsuccessfulException("The task " + taskId + " is running"); } else if (getTask(taskId, finishedTasks) != null) { - return SchedulerResponse.forbidden("The task " + taskId + " has been finished"); + throw new UnsuccessfulException("The task " + taskId + " has finished"); } final TaskEntity task = getTask(taskId, taskQueue); if (task == null) { final String message = new StringBuilder().append("Task with ID ").append(taskId).append(" is not found").toString(); - return SchedulerResponse.notFound(message); + throw new NotFoundException(message); } else { taskQueue.remove(task); canceledTasks.add(task); - return SchedulerResponse.ok("Canceled " + taskId); + return taskId; } } /** * Clear the pending list. + * @return the number of removed tasks */ - public synchronized SchedulerResponse clear() { + public synchronized int clear() { final int count = taskQueue.size(); for (final TaskEntity task : taskQueue) { canceledTasks.add(task); } taskQueue.clear(); - return SchedulerResponse.ok(count + " tasks removed."); + return count; } /** * Get the list of Tasks, which are grouped by the states. + * @return a Map grouped by task state, each with a List of taskIds. */ - public synchronized SchedulerResponse getList() { - final StringBuilder sb = new StringBuilder(); - sb.append("Running :"); - for (final TaskEntity running : runningTasks) { - sb.append(" ").append(running.getId()); - } - - sb.append("\nWaiting :"); - for (final TaskEntity waiting : taskQueue) { - sb.append(" ").append(waiting.getId()); - } + public synchronized Map<String, List<Integer>> getList() { + final Map<String, List<Integer>> tasks = new LinkedHashMap<>(); - sb.append("\nFinished :"); - for (final TaskEntity finished : finishedTasks) { - sb.append(" ").append(finished.getId()); - } + tasks.put("Running", getTaskIdList(runningTasks)); + tasks.put("Waiting", getTaskIdList(taskQueue)); + tasks.put("Finished", getTaskIdList(finishedTasks)); + tasks.put("Canceled", getTaskIdList(canceledTasks)); - sb.append("\nCanceled :"); - for (final TaskEntity canceled : canceledTasks) { - sb.append(" ").append(canceled.getId()); - } - return SchedulerResponse.ok(sb.toString()); + return tasks; } /** * Get the status of a Task. + * @return the status of the Task. */ - public synchronized SchedulerResponse getTaskStatus(final int taskId) { + public synchronized String getTaskStatus(final int taskId) throws NotFoundException { - for (final TaskEntity running : runningTasks) { - if (taskId == running.getId()) { - return SchedulerResponse.ok("Running : " + running.toString()); - } + final TaskEntity running = getTask(taskId, runningTasks); + if (running != null) { + return "Running : " + running.toString(); } - for (final TaskEntity waiting : taskQueue) { - if (taskId == waiting.getId()) { - return SchedulerResponse.ok("Waiting : " + waiting.toString()); - } + final TaskEntity waiting = getTask(taskId, taskQueue); + if (waiting != null) { + return "Waiting : " + waiting.toString(); } - for (final TaskEntity finished : finishedTasks) { - if (taskId == finished.getId()) { - return SchedulerResponse.ok("Finished : " + finished.toString()); - } + final TaskEntity finished = getTask(taskId, finishedTasks); + if (finished != null) { + return "Finished : " + finished.toString(); } - for (final TaskEntity finished : canceledTasks) { - if (taskId == finished.getId()) { - return SchedulerResponse.ok("Canceled: " + finished.toString()); - } + final TaskEntity canceled = getTask(taskId, canceledTasks); + if (canceled != null) { + return "Canceled: " + canceled.toString(); } - return SchedulerResponse.notFound( + + throw new NotFoundException( new StringBuilder().append("Task with ID ").append(taskId).append(" is not found").toString()); } @@ -215,17 +206,26 @@ final class Scheduler { } /** + * @return A list of taskIds in the TaskEntity collection. + */ + private static List<Integer> getTaskIdList(final Collection<TaskEntity> tasks) { + final List<Integer> taskIdList = new ArrayList<>(tasks.size()); + for (final TaskEntity task : tasks) { + taskIdList.add(task.getId()); + } + return taskIdList; + } + + /** * Iterate over the collection to find a TaskEntity with ID. */ - private TaskEntity getTask(final int taskId, final Collection<TaskEntity> tasks) { - TaskEntity result = null; + private static TaskEntity getTask(final int taskId, final Collection<TaskEntity> tasks) { for (final TaskEntity task : tasks) { if (taskId == task.getId()) { - result = task; - break; + return task; } } - return result; + return null; } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java index 64e1770..8e1288d 100644 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java @@ -25,6 +25,8 @@ import org.apache.reef.driver.evaluator.EvaluatorRequest; import org.apache.reef.driver.evaluator.EvaluatorRequestor; import org.apache.reef.driver.task.CompletedTask; import org.apache.reef.examples.scheduler.client.SchedulerREEF; +import org.apache.reef.examples.scheduler.driver.exceptions.NotFoundException; +import org.apache.reef.examples.scheduler.driver.exceptions.UnsuccessfulException; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.annotations.Unit; import org.apache.reef.wake.EventHandler; @@ -34,6 +36,7 @@ import org.apache.reef.wake.time.event.StartTime; import javax.annotation.concurrent.GuardedBy; import javax.inject.Inject; import java.util.List; +import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; @@ -171,30 +174,25 @@ public final class SchedulerDriver { } } + /** * Get the list of tasks in the scheduler. */ - public synchronized SchedulerResponse getList() { + public synchronized Map<String, List<Integer>> getList() { return scheduler.getList(); } /** * Clear all the Tasks from the waiting queue. */ - public synchronized SchedulerResponse clearList() { + public synchronized int clearList() { return scheduler.clear(); } /** * Get the status of a task. */ - public SchedulerResponse getTaskStatus(final List<String> args) { - if (args.size() != 1) { - return SchedulerResponse.badRequest("Usage : only one ID at a time"); - } - - final Integer taskId = Integer.valueOf(args.get(0)); - + public String getTaskStatus(final int taskId) throws NotFoundException { synchronized (SchedulerDriver.this) { return scheduler.getTaskStatus(taskId); } @@ -204,13 +202,7 @@ public final class SchedulerDriver { * Cancel a Task waiting on the queue. A task cannot be canceled * once it is running. */ - public SchedulerResponse cancelTask(final List<String> args) { - if (args.size() != 1) { - return SchedulerResponse.badRequest("Usage : only one ID at a time"); - } - - final Integer taskId = Integer.valueOf(args.get(0)); - + public int cancelTask(final int taskId) throws NotFoundException, UnsuccessfulException { synchronized (SchedulerDriver.this) { return scheduler.cancelTask(taskId); } @@ -219,14 +211,8 @@ public final class SchedulerDriver { /** * Submit a command to schedule. */ - public SchedulerResponse submitCommands(final List<String> args) { - if (args.size() != 1) { - return SchedulerResponse.badRequest("Usage : only one command at a time"); - } - - final String command = args.get(0); + public int submitCommand(final String command) { final Integer id; - synchronized (SchedulerDriver.this) { id = scheduler.assignTaskId(); scheduler.addTask(new TaskEntity(id, command)); @@ -237,7 +223,7 @@ public final class SchedulerDriver { requestEvaluator(1); } } - return SchedulerResponse.ok("Task ID : " + id); + return id; } /** @@ -245,26 +231,20 @@ public final class SchedulerDriver { * Request more evaluators in case there are pending tasks * in the queue and the number of evaluators is less than the limit. */ - public SchedulerResponse setMaxEvaluators(final List<String> args) { - if (args.size() != 1) { - return SchedulerResponse.badRequest("Usage : Only one value can be used"); - } - - final int nTarget = Integer.valueOf(args.get(0)); - + public int setMaxEvaluators(final int targetNum) throws UnsuccessfulException { synchronized (SchedulerDriver.this) { - if (nTarget < nActiveEval + nRequestedEval) { - return SchedulerResponse.forbidden(nActiveEval + nRequestedEval + + if (targetNum < nActiveEval + nRequestedEval) { + throw new UnsuccessfulException(nActiveEval + nRequestedEval + " evaluators are used now. Should be larger than that."); } - nMaxEval = nTarget; + nMaxEval = targetNum; if (scheduler.hasPendingTasks()) { final int nToRequest = Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - nRequestedEval; requestEvaluator(nToRequest); } - return SchedulerResponse.ok("You can use evaluators up to " + nMaxEval + " evaluators."); + return nMaxEval; } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java deleted file mode 100644 index 7d05e9b..0000000 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.examples.scheduler.driver; - -import org.apache.reef.tang.InjectionFuture; -import org.apache.reef.webserver.HttpHandler; -import org.apache.reef.webserver.ParsedHttpRequest; - -import javax.inject.Inject; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * Receive HttpRequest so that it can handle the command list. - */ -public final class SchedulerHttpHandler implements HttpHandler { - private final InjectionFuture<SchedulerDriver> schedulerDriver; - - private String uriSpecification = "reef-example-scheduler"; - - @Inject - private SchedulerHttpHandler(final InjectionFuture<SchedulerDriver> schedulerDriver) { - this.schedulerDriver = schedulerDriver; - } - - @Override - public String getUriSpecification() { - return uriSpecification; - } - - @Override - public void setUriSpecification(final String s) { - uriSpecification = s; - } - - /** - * HttpRequest handler. You must specify UriSpecification and REST API version. - * The request url is http://{address}:{port}/reef-example-scheduler/v1 - * - * APIs - * /list to get the status list for all tasks - * /status?id={id} to query the status of such a task, given id - * /submit?cmd={cmd} to submit a Task, which returns its id - * /cancel?id={id} to cancel the task's execution - * /max-eval?num={num} to set the maximum number of evaluators - * /clear to clear the waiting queue - */ - @Override - public void onHttpRequest(final ParsedHttpRequest request, final HttpServletResponse response) - throws IOException, ServletException { - final String target = request.getTargetEntity().toLowerCase(); - final Map<String, List<String>> queryMap = request.getQueryMap(); - - final SchedulerResponse result; - switch (target) { - case "list": - result = schedulerDriver.get().getList(); - break; - case "clear": - result = schedulerDriver.get().clearList(); - break; - case "status": - result = schedulerDriver.get().getTaskStatus(queryMap.get("id")); - break; - case "submit": - result = schedulerDriver.get().submitCommands(queryMap.get("cmd")); - break; - case "cancel": - result = schedulerDriver.get().cancelTask(queryMap.get("id")); - break; - case "max-eval": - result = schedulerDriver.get().setMaxEvaluators(queryMap.get("num")); - break; - default: - result = SchedulerResponse.notFound("Unsupported operation"); - } - - // Send response to the http client - final int status = result.getStatus(); - final String message= result.getMessage(); - - if (result.isOK()) { - response.getOutputStream().println(message); - } else { - response.sendError(status, message); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java deleted file mode 100644 index e3d1272..0000000 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.examples.scheduler.driver; - -/** - * This class specifies the response from the Scheduler. - * It includes the status code and message. - */ -final class SchedulerResponse { - /** - * 200 OK : The request succeeded normally. - */ - private static final int SC_OK = 200; - - /** - * 400 BAD REQUEST : The request is syntactically incorrect. - */ - private static final int SC_BAD_REQUEST = 400; - - /** - * 403 FORBIDDEN : Syntactically okay but refused to process. - */ - private static final int SC_FORBIDDEN = 403; - - /** - * 404 NOT FOUND : The resource is not available. - */ - private static final int SC_NOT_FOUND = 404; - - /** - * Create a response with OK status. - */ - public static SchedulerResponse ok(final String message){ - return new SchedulerResponse(SC_OK, message); - } - - /** - * Create a response with BAD_REQUEST status. - */ - public static SchedulerResponse badRequest(final String message){ - return new SchedulerResponse(SC_BAD_REQUEST, message); - } - - /** - * Create a response with FORBIDDEN status. - */ - public static SchedulerResponse forbidden(final String message){ - return new SchedulerResponse(SC_FORBIDDEN, message); - } - - /** - * Create a response with NOT FOUND status. - */ - public static SchedulerResponse notFound(final String message){ - return new SchedulerResponse(SC_NOT_FOUND, message); - } - - /** - * Return {@code true} if the response is OK. - */ - public boolean isOK(){ - return this.status == SC_OK; - } - - /** - * Status code of the request based on RFC 2068. - */ - private int status; - - /** - * Message to send. - */ - private String message; - - /** - * Constructor using status code and message. - * @param status - * @param message - */ - private SchedulerResponse(final int status, final String message) { - this.status = status; - this.message = message; - } - - /** - * Return the status code of this response. - */ - int getStatus() { - return status; - } - - /** - * Return the message of this response. - */ - String getMessage() { - return message; - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/NotFoundException.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/NotFoundException.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/NotFoundException.java new file mode 100644 index 0000000..5023e33 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/NotFoundException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.scheduler.driver.exceptions; + +/** + * An object specified in the operation was not found. + */ +public final class NotFoundException extends Exception { + public NotFoundException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/UnsuccessfulException.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/UnsuccessfulException.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/UnsuccessfulException.java new file mode 100644 index 0000000..4163aef --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/UnsuccessfulException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.scheduler.driver.exceptions; + +/** + * The operation was unsuccessful, for reasons stated in the message. + */ +public final class UnsuccessfulException extends Exception { + public UnsuccessfulException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/package-info.java new file mode 100644 index 0000000..0bc94f6 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Task scheduler example exceptions. + */ +package org.apache.reef.examples.scheduler.driver.exceptions; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerHttpHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerHttpHandler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerHttpHandler.java new file mode 100644 index 0000000..c32e828 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerHttpHandler.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.scheduler.driver.http; + +import org.apache.reef.examples.scheduler.driver.SchedulerDriver; +import org.apache.reef.examples.scheduler.driver.exceptions.NotFoundException; +import org.apache.reef.examples.scheduler.driver.exceptions.UnsuccessfulException; +import org.apache.reef.tang.InjectionFuture; +import org.apache.reef.webserver.HttpHandler; +import org.apache.reef.webserver.ParsedHttpRequest; + +import javax.inject.Inject; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Receive HttpRequest so that it can handle the command list. + */ +public final class SchedulerHttpHandler implements HttpHandler { + private final InjectionFuture<SchedulerDriver> schedulerDriver; + + private String uriSpecification = "reef-example-scheduler"; + + @Inject + private SchedulerHttpHandler(final InjectionFuture<SchedulerDriver> schedulerDriver) { + this.schedulerDriver = schedulerDriver; + } + + @Override + public String getUriSpecification() { + return uriSpecification; + } + + @Override + public void setUriSpecification(final String s) { + uriSpecification = s; + } + + /** + * HttpRequest handler. You must specify UriSpecification and API version. + * The request url is http://{address}:{port}/scheduler/v1 + * + * APIs + * /list to get the status list for all tasks + * /status?id={id} to query the status of such a task, given id + * /submit?cmd={cmd} to submit a Task, which returns its id + * /cancel?id={id} to cancel the task's execution + * /max-eval?num={num} to set the maximum number of evaluators + * /clear to clear the waiting queue + */ + @Override + public void onHttpRequest(final ParsedHttpRequest request, final HttpServletResponse response) + throws IOException, ServletException { + final String target = request.getTargetEntity().toLowerCase(); + final Map<String, List<String>> queryMap = request.getQueryMap(); + + final SchedulerResponse result; + switch (target) { + case "list": + result = onList(); + break; + case "clear": + result = onClear(); + break; + case "status": + result = onStatus(queryMap); + break; + case "submit": + result = onSubmit(queryMap); + break; + case "cancel": + result = onCancel(queryMap); + break; + case "max-eval": + result = onMaxEval(queryMap); + break; + default: + result = SchedulerResponse.notFound("Unsupported operation"); + } + + // Send response to the http client + final int status = result.getStatus(); + final String message= result.getMessage(); + + if (result.isOK()) { + response.getOutputStream().println(message); + } else { + response.sendError(status, message); + } + } + + private SchedulerResponse onList() { + final Map<String, List<Integer>> listMap = schedulerDriver.get().getList(); + + final StringBuilder sb = new StringBuilder(); + for (final Map.Entry<String, List<Integer>> entry : listMap.entrySet()) { + sb.append("\n").append(entry.getKey()).append(" :"); + for (final int taskId : entry.getValue()) { + sb.append(" ").append(taskId); + } + } + return SchedulerResponse.ok(sb.toString()); + } + + private SchedulerResponse onClear() { + final int count = schedulerDriver.get().clearList(); + return SchedulerResponse.ok(count + " tasks removed."); + } + + private SchedulerResponse onStatus(final Map<String, List<String>> queryMap) { + final List<String> args = queryMap.get("id"); + if (args.size() != 1) { + return SchedulerResponse.badRequest("Usage : only one ID at a time"); + } + + try { + + final int taskId = Integer.valueOf(args.get(0)); + return SchedulerResponse.ok(schedulerDriver.get().getTaskStatus(taskId)); + + } catch (final NotFoundException e) { + return SchedulerResponse.notFound(e.getMessage()); + } catch (final NumberFormatException e) { + return SchedulerResponse.badRequest("Usage : ID must be an integer"); + } + } + + private SchedulerResponse onSubmit(final Map<String, List<String>> queryMap) { + final List<String> args = queryMap.get("cmd"); + if (args.size() != 1) { + return SchedulerResponse.badRequest("Usage : only one command at a time"); + } + + return SchedulerResponse.ok("Task ID : " + schedulerDriver.get().submitCommand(args.get(0))); + } + + private SchedulerResponse onCancel(final Map<String, List<String>> queryMap) { + final List<String> args = queryMap.get("id"); + if (args.size() != 1) { + return SchedulerResponse.badRequest("Usage : only one ID at a time"); + } + + try { + + final int taskId = Integer.valueOf(args.get(0)); + final int canceledId = schedulerDriver.get().cancelTask(taskId); + return SchedulerResponse.ok("Canceled " + canceledId); + + } catch (final NotFoundException e) { + return SchedulerResponse.notFound(e.getMessage()); + } catch (final UnsuccessfulException e) { + return SchedulerResponse.forbidden(e.getMessage()); + } catch (final NumberFormatException e) { + return SchedulerResponse.badRequest("Usage : ID must be an integer"); + } + } + + private SchedulerResponse onMaxEval(final Map<String, List<String>> queryMap) { + final List<String> args = queryMap.get("num"); + if (args.size() != 1) { + return SchedulerResponse.badRequest("Usage : Only one value can be used"); + } + + try { + + final int targetNum = Integer.valueOf(args.get(0)); + final int maxEval = schedulerDriver.get().setMaxEvaluators(targetNum); + return SchedulerResponse.ok("You can use up to " + maxEval + " evaluators."); + + } catch (final UnsuccessfulException e) { + return SchedulerResponse.forbidden(e.getMessage()); + } catch (final NumberFormatException e) { + return SchedulerResponse.badRequest("Usage : num must be an integer"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerResponse.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerResponse.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerResponse.java new file mode 100644 index 0000000..bd03fa8 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerResponse.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.scheduler.driver.http; + +/** + * This class specifies the response from the Scheduler. + * It includes the status code and message. + */ +final class SchedulerResponse { + /** + * 200 OK : The request succeeded normally. + */ + private static final int SC_OK = 200; + + /** + * 400 BAD REQUEST : The request is syntactically incorrect. + */ + private static final int SC_BAD_REQUEST = 400; + + /** + * 403 FORBIDDEN : Syntactically okay but refused to process. + */ + private static final int SC_FORBIDDEN = 403; + + /** + * 404 NOT FOUND : The resource is not available. + */ + private static final int SC_NOT_FOUND = 404; + + /** + * Create a response with OK status. + */ + public static SchedulerResponse ok(final String message){ + return new SchedulerResponse(SC_OK, message); + } + + /** + * Create a response with BAD_REQUEST status. + */ + public static SchedulerResponse badRequest(final String message){ + return new SchedulerResponse(SC_BAD_REQUEST, message); + } + + /** + * Create a response with FORBIDDEN status. + */ + public static SchedulerResponse forbidden(final String message){ + return new SchedulerResponse(SC_FORBIDDEN, message); + } + + /** + * Create a response with NOT FOUND status. + */ + public static SchedulerResponse notFound(final String message){ + return new SchedulerResponse(SC_NOT_FOUND, message); + } + + /** + * Return {@code true} if the response is OK. + */ + public boolean isOK(){ + return this.status == SC_OK; + } + + /** + * Status code of the request based on RFC 2068. + */ + private int status; + + /** + * Message to send. + */ + private String message; + + /** + * Constructor using status code and message. + * @param status + * @param message + */ + private SchedulerResponse(final int status, final String message) { + this.status = status; + this.message = message; + } + + /** + * Return the status code of this response. + */ + int getStatus() { + return status; + } + + /** + * Return the message of this response. + */ + String getMessage() { + return message; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/package-info.java new file mode 100644 index 0000000..6e64375 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Task scheduler example HTTP API. + */ +package org.apache.reef.examples.scheduler.driver.http;
