Repository: incubator-reef
Updated Branches:
  refs/heads/master fbe7118c4 -> 6c8a46083


[REEF-694] Refactor reef-examples/scheduler to separate core and HTTP logic

Move all HTTP-specific logic to http package.
Introduce exceptions into core logic.

JIRA:
  [REEF-694](https://issues.apache.org/jira/browse/REEF-694)

Pull Request:
  Closes #445


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/6c8a4608
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/6c8a4608
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/6c8a4608

Branch: refs/heads/master
Commit: 6c8a460836c52197b3d700d704480e804eb0a9eb
Parents: fbe7118
Author: Brian Cho <[email protected]>
Authored: Mon Aug 31 12:08:54 2015 +0900
Committer: Byung-Gon Chun <[email protected]>
Committed: Mon Aug 31 18:21:28 2015 +0900

----------------------------------------------------------------------
 .../scheduler/client/SchedulerREEF.java         |   2 +-
 .../examples/scheduler/driver/Scheduler.java    | 100 +++++-----
 .../scheduler/driver/SchedulerDriver.java       |  50 ++---
 .../scheduler/driver/SchedulerHttpHandler.java  | 107 ----------
 .../scheduler/driver/SchedulerResponse.java     | 114 -----------
 .../driver/exceptions/NotFoundException.java    |  28 +++
 .../exceptions/UnsuccessfulException.java       |  28 +++
 .../driver/exceptions/package-info.java         |  22 +++
 .../driver/http/SchedulerHttpHandler.java       | 195 +++++++++++++++++++
 .../driver/http/SchedulerResponse.java          | 114 +++++++++++
 .../scheduler/driver/http/package-info.java     |  22 +++
 11 files changed, 475 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java
index 15277f1..688d339 100644
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java
@@ -22,7 +22,7 @@ import org.apache.commons.cli.ParseException;
 import org.apache.reef.client.DriverConfiguration;
 import org.apache.reef.client.REEF;
 import org.apache.reef.examples.scheduler.driver.SchedulerDriver;
-import org.apache.reef.examples.scheduler.driver.SchedulerHttpHandler;
+import org.apache.reef.examples.scheduler.driver.http.SchedulerHttpHandler;
 import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Configurations;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java
index 6dd6f15..752d171 100644
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java
@@ -22,6 +22,8 @@ import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.driver.task.TaskConfiguration;
 import org.apache.reef.examples.library.Command;
 import org.apache.reef.examples.library.ShellTask;
+import org.apache.reef.examples.scheduler.driver.exceptions.NotFoundException;
+import 
org.apache.reef.examples.scheduler.driver.exceptions.UnsuccessfulException;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Configurations;
 import org.apache.reef.tang.Tang;
@@ -30,7 +32,9 @@ import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -87,93 +91,80 @@ final class Scheduler {
   /**
    * Update the record of task to mark it as canceled.
    */
-  public synchronized SchedulerResponse cancelTask(final int taskId) {
+  public synchronized int cancelTask(final int taskId) throws 
UnsuccessfulException, NotFoundException {
     if (getTask(taskId, runningTasks) != null) {
-      return SchedulerResponse.forbidden("The task " + taskId + " is running");
+      throw new UnsuccessfulException("The task " + taskId + " is running");
     } else if (getTask(taskId, finishedTasks) != null) {
-      return SchedulerResponse.forbidden("The task " + taskId + " has been 
finished");
+      throw new UnsuccessfulException("The task " + taskId + " has finished");
     }
 
     final TaskEntity task = getTask(taskId, taskQueue);
     if (task == null) {
       final String message =
           new StringBuilder().append("Task with ID ").append(taskId).append(" 
is not found").toString();
-      return SchedulerResponse.notFound(message);
+      throw new NotFoundException(message);
     } else {
       taskQueue.remove(task);
       canceledTasks.add(task);
-      return SchedulerResponse.ok("Canceled " + taskId);
+      return taskId;
     }
   }
 
   /**
    * Clear the pending list.
+   * @return the number of removed tasks
    */
-  public synchronized SchedulerResponse clear() {
+  public synchronized int clear() {
     final int count = taskQueue.size();
     for (final TaskEntity task : taskQueue) {
       canceledTasks.add(task);
     }
     taskQueue.clear();
-    return SchedulerResponse.ok(count + " tasks removed.");
+    return count;
   }
 
   /**
    * Get the list of Tasks, which are grouped by the states.
+   * @return a Map grouped by task state, each with a List of taskIds.
    */
-  public synchronized SchedulerResponse getList() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("Running :");
-    for (final TaskEntity running : runningTasks) {
-      sb.append(" ").append(running.getId());
-    }
-
-    sb.append("\nWaiting :");
-    for (final TaskEntity waiting : taskQueue) {
-      sb.append(" ").append(waiting.getId());
-    }
+  public synchronized Map<String, List<Integer>> getList() {
+    final Map<String, List<Integer>> tasks = new LinkedHashMap<>();
 
-    sb.append("\nFinished :");
-    for (final TaskEntity finished : finishedTasks) {
-      sb.append(" ").append(finished.getId());
-    }
+    tasks.put("Running", getTaskIdList(runningTasks));
+    tasks.put("Waiting", getTaskIdList(taskQueue));
+    tasks.put("Finished", getTaskIdList(finishedTasks));
+    tasks.put("Canceled", getTaskIdList(canceledTasks));
 
-    sb.append("\nCanceled :");
-    for (final TaskEntity canceled : canceledTasks) {
-      sb.append(" ").append(canceled.getId());
-    }
-    return SchedulerResponse.ok(sb.toString());
+    return tasks;
   }
 
   /**
    * Get the status of a Task.
+   * @return the status of the Task.
    */
-  public synchronized SchedulerResponse getTaskStatus(final int taskId) {
+  public synchronized String getTaskStatus(final int taskId) throws 
NotFoundException {
 
-    for (final TaskEntity running : runningTasks) {
-      if (taskId == running.getId()) {
-        return SchedulerResponse.ok("Running : " + running.toString());
-      }
+    final TaskEntity running = getTask(taskId, runningTasks);
+    if (running != null) {
+      return "Running : " + running.toString();
     }
 
-    for (final TaskEntity waiting : taskQueue) {
-      if (taskId == waiting.getId()) {
-        return SchedulerResponse.ok("Waiting : " + waiting.toString());
-      }
+    final TaskEntity waiting = getTask(taskId, taskQueue);
+    if (waiting != null) {
+      return "Waiting : " + waiting.toString();
     }
 
-    for (final TaskEntity finished : finishedTasks) {
-      if (taskId == finished.getId()) {
-        return SchedulerResponse.ok("Finished : " + finished.toString());
-      }
+    final TaskEntity finished = getTask(taskId, finishedTasks);
+    if (finished != null) {
+      return "Finished : " + finished.toString();
     }
 
-    for (final TaskEntity finished : canceledTasks) {
-      if (taskId == finished.getId()) {
-        return SchedulerResponse.ok("Canceled: " + finished.toString());
-      }
+    final TaskEntity canceled = getTask(taskId, canceledTasks);
+    if (canceled != null) {
+      return "Canceled: " + canceled.toString();
     }
-    return SchedulerResponse.notFound(
+
+    throw new NotFoundException(
         new StringBuilder().append("Task with ID ").append(taskId).append(" is 
not found").toString());
   }
 
@@ -215,17 +206,26 @@ final class Scheduler {
   }
 
   /**
+   * @return A list of taskIds in the TaskEntity collection.
+   */
+  private static List<Integer> getTaskIdList(final Collection<TaskEntity> 
tasks) {
+    final List<Integer> taskIdList = new ArrayList<>(tasks.size());
+    for (final TaskEntity task : tasks) {
+      taskIdList.add(task.getId());
+    }
+    return taskIdList;
+  }
+
+  /**
    * Iterate over the collection to find a TaskEntity with ID.
    */
-  private TaskEntity getTask(final int taskId, final Collection<TaskEntity> 
tasks) {
-    TaskEntity result = null;
+  private static TaskEntity getTask(final int taskId, final 
Collection<TaskEntity> tasks) {
     for (final TaskEntity task : tasks) {
       if (taskId == task.getId()) {
-        result = task;
-        break;
+        return task;
       }
     }
-    return result;
+    return null;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java
index 64e1770..8e1288d 100644
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java
@@ -25,6 +25,8 @@ import org.apache.reef.driver.evaluator.EvaluatorRequest;
 import org.apache.reef.driver.evaluator.EvaluatorRequestor;
 import org.apache.reef.driver.task.CompletedTask;
 import org.apache.reef.examples.scheduler.client.SchedulerREEF;
+import org.apache.reef.examples.scheduler.driver.exceptions.NotFoundException;
+import 
org.apache.reef.examples.scheduler.driver.exceptions.UnsuccessfulException;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.annotations.Unit;
 import org.apache.reef.wake.EventHandler;
@@ -34,6 +36,7 @@ import org.apache.reef.wake.time.event.StartTime;
 import javax.annotation.concurrent.GuardedBy;
 import javax.inject.Inject;
 import java.util.List;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -171,30 +174,25 @@ public final class SchedulerDriver {
     }
   }
 
+
   /**
    * Get the list of tasks in the scheduler.
    */
-  public synchronized SchedulerResponse getList() {
+  public synchronized Map<String, List<Integer>> getList() {
     return scheduler.getList();
   }
 
   /**
    * Clear all the Tasks from the waiting queue.
    */
-  public synchronized SchedulerResponse clearList() {
+  public synchronized int clearList() {
     return scheduler.clear();
   }
 
   /**
    * Get the status of a task.
    */
-  public SchedulerResponse getTaskStatus(final List<String> args) {
-    if (args.size() != 1) {
-      return SchedulerResponse.badRequest("Usage : only one ID at a time");
-    }
-
-    final Integer taskId = Integer.valueOf(args.get(0));
-
+  public String getTaskStatus(final int taskId) throws NotFoundException {
     synchronized (SchedulerDriver.this) {
       return scheduler.getTaskStatus(taskId);
     }
@@ -204,13 +202,7 @@ public final class SchedulerDriver {
    * Cancel a Task waiting on the queue. A task cannot be canceled
    * once it is running.
    */
-  public SchedulerResponse cancelTask(final List<String> args) {
-    if (args.size() != 1) {
-      return SchedulerResponse.badRequest("Usage : only one ID at a time");
-    }
-
-    final Integer taskId = Integer.valueOf(args.get(0));
-
+  public int cancelTask(final int taskId) throws NotFoundException, 
UnsuccessfulException {
     synchronized (SchedulerDriver.this) {
       return scheduler.cancelTask(taskId);
     }
@@ -219,14 +211,8 @@ public final class SchedulerDriver {
   /**
    * Submit a command to schedule.
    */
-  public SchedulerResponse submitCommands(final List<String> args) {
-    if (args.size() != 1) {
-      return SchedulerResponse.badRequest("Usage : only one command at a 
time");
-    }
-
-    final String command = args.get(0);
+  public int submitCommand(final String command) {
     final Integer id;
-
     synchronized (SchedulerDriver.this) {
       id = scheduler.assignTaskId();
       scheduler.addTask(new TaskEntity(id, command));
@@ -237,7 +223,7 @@ public final class SchedulerDriver {
         requestEvaluator(1);
       }
     }
-    return SchedulerResponse.ok("Task ID : " + id);
+    return id;
   }
 
   /**
@@ -245,26 +231,20 @@ public final class SchedulerDriver {
    * Request more evaluators in case there are pending tasks
    * in the queue and the number of evaluators is less than the limit.
    */
-  public SchedulerResponse setMaxEvaluators(final List<String> args) {
-    if (args.size() != 1) {
-      return SchedulerResponse.badRequest("Usage : Only one value can be 
used");
-    }
-
-    final int nTarget = Integer.valueOf(args.get(0));
-
+  public int setMaxEvaluators(final int targetNum) throws 
UnsuccessfulException {
     synchronized (SchedulerDriver.this) {
-      if (nTarget < nActiveEval + nRequestedEval) {
-        return SchedulerResponse.forbidden(nActiveEval + nRequestedEval +
+      if (targetNum < nActiveEval + nRequestedEval) {
+        throw new UnsuccessfulException(nActiveEval + nRequestedEval +
             " evaluators are used now. Should be larger than that.");
       }
-      nMaxEval = nTarget;
+      nMaxEval = targetNum;
 
       if (scheduler.hasPendingTasks()) {
         final int nToRequest =
             Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - 
nRequestedEval;
         requestEvaluator(nToRequest);
       }
-      return SchedulerResponse.ok("You can use evaluators up to " + nMaxEval + 
" evaluators.");
+      return nMaxEval;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java
deleted file mode 100644
index 7d05e9b..0000000
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.scheduler.driver;
-
-import org.apache.reef.tang.InjectionFuture;
-import org.apache.reef.webserver.HttpHandler;
-import org.apache.reef.webserver.ParsedHttpRequest;
-
-import javax.inject.Inject;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Receive HttpRequest so that it can handle the command list.
- */
-public final class SchedulerHttpHandler implements HttpHandler {
-  private final InjectionFuture<SchedulerDriver> schedulerDriver;
-
-  private String uriSpecification = "reef-example-scheduler";
-
-  @Inject
-  private SchedulerHttpHandler(final InjectionFuture<SchedulerDriver> 
schedulerDriver) {
-    this.schedulerDriver = schedulerDriver;
-  }
-
-  @Override
-  public String getUriSpecification() {
-    return uriSpecification;
-  }
-
-  @Override
-  public void setUriSpecification(final String s) {
-    uriSpecification = s;
-  }
-
-  /**
-   * HttpRequest handler. You must specify UriSpecification and REST API 
version.
-   * The request url is http://{address}:{port}/reef-example-scheduler/v1
-   *
-   * APIs
-   *   /list                to get the status list for all tasks
-   *   /status?id={id}      to query the status of such a task, given id
-   *   /submit?cmd={cmd}    to submit a Task, which returns its id
-   *   /cancel?id={id}      to cancel the task's execution
-   *   /max-eval?num={num}  to set the maximum number of evaluators
-   *   /clear               to clear the waiting queue
-   */
-  @Override
-  public void onHttpRequest(final ParsedHttpRequest request, final 
HttpServletResponse response)
-    throws IOException, ServletException {
-    final String target = request.getTargetEntity().toLowerCase();
-    final Map<String, List<String>> queryMap = request.getQueryMap();
-
-    final SchedulerResponse result;
-    switch (target) {
-    case "list":
-      result = schedulerDriver.get().getList();
-      break;
-    case "clear":
-      result = schedulerDriver.get().clearList();
-      break;
-    case "status":
-      result = schedulerDriver.get().getTaskStatus(queryMap.get("id"));
-      break;
-    case "submit":
-      result = schedulerDriver.get().submitCommands(queryMap.get("cmd"));
-      break;
-    case "cancel":
-      result = schedulerDriver.get().cancelTask(queryMap.get("id"));
-      break;
-    case "max-eval":
-      result = schedulerDriver.get().setMaxEvaluators(queryMap.get("num"));
-      break;
-    default:
-      result = SchedulerResponse.notFound("Unsupported operation");
-    }
-
-    // Send response to the http client
-    final int status = result.getStatus();
-    final String message= result.getMessage();
-
-    if (result.isOK()) {
-      response.getOutputStream().println(message);
-    } else {
-      response.sendError(status, message);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java
deleted file mode 100644
index e3d1272..0000000
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.scheduler.driver;
-
-/**
- * This class specifies the response from the Scheduler.
- * It includes the status code and message.
- */
-final class SchedulerResponse {
-  /**
-   * 200 OK : The request succeeded normally.
-   */
-  private static final int SC_OK = 200;
-
-  /**
-   * 400 BAD REQUEST : The request is syntactically incorrect.
-   */
-  private static final int SC_BAD_REQUEST = 400;
-
-  /**
-   * 403 FORBIDDEN : Syntactically okay but refused to process.
-   */
-  private static final int SC_FORBIDDEN = 403;
-
-  /**
-   * 404 NOT FOUND :  The resource is not available.
-   */
-  private static final int SC_NOT_FOUND = 404;
-
-  /**
-   * Create a response with OK status.
-   */
-  public static SchedulerResponse ok(final String message){
-    return new SchedulerResponse(SC_OK, message);
-  }
-
-  /**
-   * Create a response with BAD_REQUEST status.
-   */
-  public static SchedulerResponse badRequest(final String message){
-    return new SchedulerResponse(SC_BAD_REQUEST, message);
-  }
-
-  /**
-   * Create a response with FORBIDDEN status.
-   */
-  public static SchedulerResponse forbidden(final String message){
-    return new SchedulerResponse(SC_FORBIDDEN, message);
-  }
-
-  /**
-   * Create a response with NOT FOUND status.
-   */
-  public static SchedulerResponse notFound(final String message){
-    return new SchedulerResponse(SC_NOT_FOUND, message);
-  }
-
-  /**
-   * Return {@code true} if the response is OK.
-   */
-  public boolean isOK(){
-    return this.status == SC_OK;
-  }
-
-  /**
-   * Status code of the request based on RFC 2068.
-   */
-  private int status;
-
-  /**
-   * Message to send.
-   */
-  private String message;
-
-  /**
-   * Constructor using status code and message.
-   * @param status
-   * @param message
-   */
-  private SchedulerResponse(final int status, final String message) {
-    this.status = status;
-    this.message = message;
-  }
-
-  /**
-   * Return the status code of this response.
-   */
-  int getStatus() {
-    return status;
-  }
-
-  /**
-   * Return the message of this response.
-   */
-  String getMessage() {
-    return message;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/NotFoundException.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/NotFoundException.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/NotFoundException.java
new file mode 100644
index 0000000..5023e33
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/NotFoundException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler.driver.exceptions;
+
+/**
+ * An object specified in the operation was not found.
+ */
+public final class NotFoundException extends Exception {
+  public NotFoundException(final String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/UnsuccessfulException.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/UnsuccessfulException.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/UnsuccessfulException.java
new file mode 100644
index 0000000..4163aef
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/UnsuccessfulException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler.driver.exceptions;
+
+/**
+ * The operation was unsuccessful, for reasons stated in the message.
+ */
+public final class UnsuccessfulException extends Exception {
+  public UnsuccessfulException(final String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/package-info.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/package-info.java
new file mode 100644
index 0000000..0bc94f6
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/exceptions/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Task scheduler example exceptions.
+ */
+package org.apache.reef.examples.scheduler.driver.exceptions;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerHttpHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerHttpHandler.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerHttpHandler.java
new file mode 100644
index 0000000..c32e828
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerHttpHandler.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler.driver.http;
+
+import org.apache.reef.examples.scheduler.driver.SchedulerDriver;
+import org.apache.reef.examples.scheduler.driver.exceptions.NotFoundException;
+import 
org.apache.reef.examples.scheduler.driver.exceptions.UnsuccessfulException;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.webserver.HttpHandler;
+import org.apache.reef.webserver.ParsedHttpRequest;
+
+import javax.inject.Inject;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Receive HttpRequest so that it can handle the command list.
+ */
+public final class SchedulerHttpHandler implements HttpHandler {
+  private final InjectionFuture<SchedulerDriver> schedulerDriver;
+
+  private String uriSpecification = "reef-example-scheduler";
+
+  @Inject
+  private SchedulerHttpHandler(final InjectionFuture<SchedulerDriver> 
schedulerDriver) {
+    this.schedulerDriver = schedulerDriver;
+  }
+
+  @Override
+  public String getUriSpecification() {
+    return uriSpecification;
+  }
+
+  @Override
+  public void setUriSpecification(final String s) {
+    uriSpecification = s;
+  }
+
+  /**
+   * HttpRequest handler. You must specify UriSpecification and API version.
+   * The request url is http://{address}:{port}/scheduler/v1
+   *
+   * APIs
+   *   /list                to get the status list for all tasks
+   *   /status?id={id}      to query the status of such a task, given id
+   *   /submit?cmd={cmd}    to submit a Task, which returns its id
+   *   /cancel?id={id}      to cancel the task's execution
+   *   /max-eval?num={num}  to set the maximum number of evaluators
+   *   /clear               to clear the waiting queue
+   */
+  @Override
+  public void onHttpRequest(final ParsedHttpRequest request, final 
HttpServletResponse response)
+      throws IOException, ServletException {
+    final String target = request.getTargetEntity().toLowerCase();
+    final Map<String, List<String>> queryMap = request.getQueryMap();
+
+    final SchedulerResponse result;
+    switch (target) {
+    case "list":
+      result = onList();
+      break;
+    case "clear":
+      result = onClear();
+      break;
+    case "status":
+      result = onStatus(queryMap);
+      break;
+    case "submit":
+      result = onSubmit(queryMap);
+      break;
+    case "cancel":
+      result = onCancel(queryMap);
+      break;
+    case "max-eval":
+      result = onMaxEval(queryMap);
+      break;
+    default:
+      result = SchedulerResponse.notFound("Unsupported operation");
+    }
+
+    // Send response to the http client
+    final int status = result.getStatus();
+    final String message= result.getMessage();
+
+    if (result.isOK()) {
+      response.getOutputStream().println(message);
+    } else {
+      response.sendError(status, message);
+    }
+  }
+
+  private SchedulerResponse onList() {
+    final Map<String, List<Integer>> listMap = schedulerDriver.get().getList();
+
+    final StringBuilder sb = new StringBuilder();
+    for (final Map.Entry<String, List<Integer>> entry : listMap.entrySet()) {
+      sb.append("\n").append(entry.getKey()).append(" :");
+      for (final int taskId : entry.getValue()) {
+        sb.append(" ").append(taskId);
+      }
+    }
+    return SchedulerResponse.ok(sb.toString());
+  }
+
+  private SchedulerResponse onClear() {
+    final int count = schedulerDriver.get().clearList();
+    return SchedulerResponse.ok(count + " tasks removed.");
+  }
+
+  private SchedulerResponse onStatus(final Map<String, List<String>> queryMap) 
{
+    final List<String> args = queryMap.get("id");
+    if (args.size() != 1) {
+      return SchedulerResponse.badRequest("Usage : only one ID at a time");
+    }
+
+    try {
+
+      final int taskId = Integer.valueOf(args.get(0));
+      return SchedulerResponse.ok(schedulerDriver.get().getTaskStatus(taskId));
+
+    } catch (final NotFoundException e) {
+      return SchedulerResponse.notFound(e.getMessage());
+    } catch (final NumberFormatException e) {
+      return SchedulerResponse.badRequest("Usage : ID must be an integer");
+    }
+  }
+
+  private SchedulerResponse onSubmit(final Map<String, List<String>> queryMap) 
{
+    final List<String> args = queryMap.get("cmd");
+    if (args.size() != 1) {
+      return SchedulerResponse.badRequest("Usage : only one command at a 
time");
+    }
+
+    return SchedulerResponse.ok("Task ID : " + 
schedulerDriver.get().submitCommand(args.get(0)));
+  }
+
+  private SchedulerResponse onCancel(final Map<String, List<String>> queryMap) 
{
+    final List<String> args = queryMap.get("id");
+    if (args.size() != 1) {
+      return SchedulerResponse.badRequest("Usage : only one ID at a time");
+    }
+
+    try {
+
+      final int taskId = Integer.valueOf(args.get(0));
+      final int canceledId = schedulerDriver.get().cancelTask(taskId);
+      return SchedulerResponse.ok("Canceled " + canceledId);
+
+    } catch (final NotFoundException e) {
+      return SchedulerResponse.notFound(e.getMessage());
+    } catch (final UnsuccessfulException e) {
+      return SchedulerResponse.forbidden(e.getMessage());
+    } catch (final NumberFormatException e) {
+      return SchedulerResponse.badRequest("Usage : ID must be an integer");
+    }
+  }
+
+  private SchedulerResponse onMaxEval(final Map<String, List<String>> 
queryMap) {
+    final List<String> args = queryMap.get("num");
+    if (args.size() != 1) {
+      return SchedulerResponse.badRequest("Usage : Only one value can be 
used");
+    }
+
+    try {
+
+      final int targetNum = Integer.valueOf(args.get(0));
+      final int maxEval = schedulerDriver.get().setMaxEvaluators(targetNum);
+      return SchedulerResponse.ok("You can use up to " + maxEval + " 
evaluators.");
+
+    } catch (final UnsuccessfulException e) {
+      return SchedulerResponse.forbidden(e.getMessage());
+    } catch (final NumberFormatException e) {
+      return SchedulerResponse.badRequest("Usage : num must be an integer");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerResponse.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerResponse.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerResponse.java
new file mode 100644
index 0000000..bd03fa8
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/SchedulerResponse.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler.driver.http;
+
+/**
+ * This class specifies the response from the Scheduler.
+ * It includes the status code and message.
+ */
+final class SchedulerResponse {
+  /**
+   * 200 OK : The request succeeded normally.
+   */
+  private static final int SC_OK = 200;
+
+  /**
+   * 400 BAD REQUEST : The request is syntactically incorrect.
+   */
+  private static final int SC_BAD_REQUEST = 400;
+
+  /**
+   * 403 FORBIDDEN : Syntactically okay but refused to process.
+   */
+  private static final int SC_FORBIDDEN = 403;
+
+  /**
+   * 404 NOT FOUND :  The resource is not available.
+   */
+  private static final int SC_NOT_FOUND = 404;
+
+  /**
+   * Create a response with OK status.
+   */
+  public static SchedulerResponse ok(final String message){
+    return new SchedulerResponse(SC_OK, message);
+  }
+
+  /**
+   * Create a response with BAD_REQUEST status.
+   */
+  public static SchedulerResponse badRequest(final String message){
+    return new SchedulerResponse(SC_BAD_REQUEST, message);
+  }
+
+  /**
+   * Create a response with FORBIDDEN status.
+   */
+  public static SchedulerResponse forbidden(final String message){
+    return new SchedulerResponse(SC_FORBIDDEN, message);
+  }
+
+  /**
+   * Create a response with NOT FOUND status.
+   */
+  public static SchedulerResponse notFound(final String message){
+    return new SchedulerResponse(SC_NOT_FOUND, message);
+  }
+
+  /**
+   * Return {@code true} if the response is OK.
+   */
+  public boolean isOK(){
+    return this.status == SC_OK;
+  }
+
+  /**
+   * Status code of the request based on RFC 2068.
+   */
+  private int status;
+
+  /**
+   * Message to send.
+   */
+  private String message;
+
+  /**
+   * Constructor using status code and message.
+   * @param status
+   * @param message
+   */
+  private SchedulerResponse(final int status, final String message) {
+    this.status = status;
+    this.message = message;
+  }
+
+  /**
+   * Return the status code of this response.
+   */
+  int getStatus() {
+    return status;
+  }
+
+  /**
+   * Return the message of this response.
+   */
+  String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c8a4608/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/package-info.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/package-info.java
new file mode 100644
index 0000000..6e64375
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/http/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Task scheduler example HTTP API.
+ */
+package org.apache.reef.examples.scheduler.driver.http;


Reply via email to