[REEF-491] Initial Vortex implementation
  This addressed the issue by
  * Creating a Java ThreadPool-like user interface
  * Enabling distributed execution of user-submitted tasks
  * Developing a mechanism for handling of resource preemption events
  * Developing simple task scheduling mechanisms
  * Creating simple unit/integration tests to verify correctness.

JIRA:
  [REEF-491](https://issues.apache.org/jira/browse/REEF-491)

Pull Request:
  Closes #310


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2c994ec9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2c994ec9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2c994ec9

Branch: refs/heads/master
Commit: 2c994ec9714c2ce30f237c1b808e6335019e754b
Parents: 0181e2c
Author: John Yang <[email protected]>
Authored: Tue Jun 16 10:46:43 2015 +0800
Committer: Byung-Gon Chun <[email protected]>
Committed: Tue Aug 4 15:41:07 2015 +0900

----------------------------------------------------------------------
 lang/java/reef-applications/pom.xml             |   8 -
 .../reef-applications/reef-vortex/README.md     |  99 +++++++
 lang/java/reef-applications/reef-vortex/pom.xml |  62 ++++-
 .../org/apache/reef/vortex/VortexDriver.java    |  60 -----
 .../apache/reef/vortex/api/VortexFunction.java  |  43 ++++
 .../apache/reef/vortex/api/VortexFuture.java    | 106 ++++++++
 .../org/apache/reef/vortex/api/VortexStart.java |  34 +++
 .../reef/vortex/api/VortexThreadPool.java       |  50 ++++
 .../apache/reef/vortex/api/package-info.java    |  22 ++
 .../vortex/common/TaskletExecutionRequest.java  |  68 +++++
 .../vortex/common/TaskletFailureReport.java     |  61 +++++
 .../reef/vortex/common/TaskletResultReport.java |  64 +++++
 .../reef/vortex/common/VortexRequest.java       |  41 +++
 .../apache/reef/vortex/common/WorkerReport.java |  42 +++
 .../apache/reef/vortex/common/package-info.java |  22 ++
 .../reef/vortex/driver/DefaultVortexMaster.java | 108 ++++++++
 .../vortex/driver/PendingTaskletScheduler.java  |  59 +++++
 .../reef/vortex/driver/PendingTasklets.java     |  51 ++++
 .../reef/vortex/driver/RunningWorkers.java      | 257 +++++++++++++++++++
 .../org/apache/reef/vortex/driver/Tasklet.java  |  96 +++++++
 .../reef/vortex/driver/VortexConfHelper.java    |  66 +++++
 .../apache/reef/vortex/driver/VortexDriver.java | 193 ++++++++++++++
 .../reef/vortex/driver/VortexLauncher.java      |  62 +++++
 .../apache/reef/vortex/driver/VortexMaster.java |  67 +++++
 .../reef/vortex/driver/VortexMasterConf.java    |  98 +++++++
 .../reef/vortex/driver/VortexRequestor.java     |  50 ++++
 .../reef/vortex/driver/VortexStartExecutor.java |  52 ++++
 .../reef/vortex/driver/VortexWorkerConf.java    |  53 ++++
 .../reef/vortex/driver/VortexWorkerManager.java |  95 +++++++
 .../apache/reef/vortex/driver/package-info.java |  22 ++
 .../reef/vortex/evaluator/VortexWorker.java     | 160 ++++++++++++
 .../reef/vortex/evaluator/package-info.java     |  22 ++
 .../reef/vortex/examples/addone/AddOne.java     |  36 +++
 .../vortex/examples/addone/AddOneFunction.java  |  34 +++
 .../vortex/examples/addone/AddOneStart.java     |  67 +++++
 .../vortex/examples/addone/package-info.java    |  22 ++
 .../reef/vortex/examples/hello/HelloVortex.java |  36 +++
 .../examples/hello/HelloVortexFunction.java     |  37 +++
 .../vortex/examples/hello/HelloVortexStart.java |  48 ++++
 .../vortex/examples/hello/package-info.java     |  22 ++
 .../org/apache/reef/vortex/package-info.java    |   2 +-
 .../vortex/driver/DefaultVortexMasterTest.java  | 158 ++++++++++++
 .../reef/vortex/driver/RunningWorkersTest.java  |  64 +++++
 .../org/apache/reef/vortex/driver/TestUtil.java |  66 +++++
 .../apache/reef/vortex/driver/package-info.java |  22 ++
 lang/java/reef-tests/pom.xml                    |   5 +
 .../org/apache/reef/tests/AllTestsSuite.java    |   4 +-
 .../applications/ApplicationTestSuite.java      |  30 +++
 .../reef/tests/applications/package-info.java   |  22 ++
 .../applications/vortex/VortexTestSuite.java    |  30 +++
 .../vortex/addone/AddOneFunction.java           |  34 +++
 .../applications/vortex/addone/AddOneTest.java  |  63 +++++
 .../vortex/addone/AddOneTestStart.java          |  67 +++++
 .../vortex/addone/package-info.java             |  22 ++
 .../tests/applications/vortex/package-info.java |  22 ++
 55 files changed, 3135 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/pom.xml 
b/lang/java/reef-applications/pom.xml
index 828ace3..b7ddb2c 100644
--- a/lang/java/reef-applications/pom.xml
+++ b/lang/java/reef-applications/pom.xml
@@ -31,14 +31,6 @@ under the License.
         <relativePath>../../..</relativePath>
     </parent>
 
-    <dependencies>
-        <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>reef-common</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-    </dependencies>
-
     <modules>
         <module>reef-vortex</module>
     </modules>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/README.md
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/README.md 
b/lang/java/reef-applications/reef-vortex/README.md
new file mode 100644
index 0000000..0ea6518
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/README.md
@@ -0,0 +1,99 @@
+Vortex
+========================
+Vortex is a runtime for low-priority jobs to effectively cope 
+with non-cooperative(thus low-latency) preemptions with the following goals:
+
+* Enable high-priority jobs such as latency-sensitive online serving jobs to 
reclaim resources quickly
+* Enable low-priority jobs such as offline batch data analytics jobs to 
efficiently utilize volatile resources
+
+Vortex is currently under [active 
development](https://issues.apache.org/jira/browse/REEF-364)
+
+Key Components
+========================
+* ThreadPool API
+    * Users can submit custom functions and inputs to Vortex and retrieve 
results
+    * This API hides the details of distributed execution on unreliable 
resources(e.g. handling of preemption)
+* Tasklet
+    * User-submitted function and input are translated into Tasklet(s) in 
VortexMaster
+    * A Tasklet is then scheduled to and executed by a VortexWorker
+* Queue
+    * VortexMaster Queue(in REEF Driver): Keeps track of Tasklets waiting to 
be scheduled/enqueued to a VortexWorker Queue
+    * VortexWorker Queue(in REEF Evaluator): Keeps track of Tasklets waiting 
to be executed
+
+Example: Vector Calculation on Vortex
+====================
+```java
+/**
+ * User's main function.
+ */
+public final class AddOne {
+  private AddOne() {
+  }
+
+  public static void main(final String[] args) {
+    VortexLauncher.launchLocal("Vortex_Example_AddOne", AddOneStart.class, 2, 
1024, 4);
+  }
+}
+```
+
+* Through `VortexLauncher` user launches a Vortex job, passing following 
arguments
+  * Name of the job 
+  * Implementation of VortexStart(`AddOneStart`)
+  * Amount of resources to use
+
+```java
+/**
+ * AddOne User Code Example.
+ */
+final class AddOneStart implements VortexStart {
+  @Inject
+  public AddOneStart() {
+  }
+
+  @Override
+  public void start(final VortexThreadPool vortexThreadPool) {
+    final Vector<Integer> inputVector = new Vector<>();
+    for (int i = 0; i < 1000; i++) {
+      inputVector.add(i);
+    }
+
+    final List<VortexFuture<Integer>> futures = new ArrayList<>();
+    final AddOneFunction addOneFunction = new AddOneFunction();
+    for (final int i : inputVector) {
+      futures.add(vortexThreadPool.submit(addOneFunction, i));
+    }
+
+    final Vector<Integer> outputVector = new Vector<>();
+    for (final VortexFuture<Integer> future : futures) {
+      try {
+        outputVector.add(future.get());
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    System.out.println("RESULT:");
+    System.out.println(outputVector);
+  }
+}
+```
+
+* Through `VortexThreadPool#submit`, user submits a custom 
function(`AddOneFunction`) and its input to be executed on Vortex runtime
+* Using returned `VortexFuture`, user retrieves execution results
+
+```java
+/**
+ * Outputs input + 1.
+ */
+final class AddOneFunction implements VortexFunction<Integer, Integer> {
+  @Override
+  public Integer call(final Integer input) throws Exception {
+    return input + 1;
+  }
+}
+```
+* User implementation of VortexFunction, takes an integer as the input and 
outputs input+1
+
+
+
+ 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/pom.xml 
b/lang/java/reef-applications/reef-vortex/pom.xml
index 175c4a5..2f47ab2 100644
--- a/lang/java/reef-applications/reef-vortex/pom.xml
+++ b/lang/java/reef-applications/reef-vortex/pom.xml
@@ -20,7 +20,7 @@ under the License.
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
-    <artifactId>reef-vortex</artifactId>
+    <artifactId>vortex</artifactId>
     <name>REEF Vortex</name>
 
     <parent>
@@ -28,4 +28,64 @@ under the License.
         <artifactId>reef-applications</artifactId>
         <version>0.12.0-incubating-SNAPSHOT</version>
     </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>reef-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>reef-runtime-local</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <outputFile>
+                        
${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar
+                    </outputFile>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>${maven-checkstyle-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <id>validate</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>check</goal>
+                            <goal>checkstyle</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/VortexDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/VortexDriver.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/VortexDriver.java
deleted file mode 100644
index d3a4c70..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/VortexDriver.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex;
-
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.evaluator.EvaluatorRequest;
-import org.apache.reef.driver.evaluator.EvaluatorRequestor;
-import org.apache.reef.tang.annotations.Unit;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.event.StartTime;
-
-import javax.inject.Inject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-@Unit
-public final class VortexDriver {
-  private static final Logger LOG = 
Logger.getLogger(VortexDriver.class.getName());
-  private final EvaluatorRequestor requestor;
-
-  @Inject
-  public VortexDriver(final EvaluatorRequestor requestor) {
-    this.requestor = requestor;
-  }
-
-  public final class StartHandler implements EventHandler<StartTime> {
-    @Override
-    public void onNext(final StartTime startTime) {
-      requestor.submit(EvaluatorRequest.newBuilder()
-          .setNumber(1)
-          .setMemory(64)
-          .setNumberOfCores(1)
-          .build());
-    }
-  }
-
-  public final class EvaluatorAllocatedHandler implements 
EventHandler<AllocatedEvaluator> {
-    @Override
-    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
-      LOG.log(Level.INFO, "Evaluator Allocated: {0}", allocatedEvaluator);
-      allocatedEvaluator.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java
new file mode 100644
index 0000000..96e47b6
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.api;
+
+import org.apache.reef.annotations.Unstable;
+
+import java.io.Serializable;
+
+/**
+ * Typed user function.
+ * Implement your functions using this interface.
+ * TODO[REEF-504]: Clean up Serializable in Vortex.
+ *
+ * @param <TInput> input type
+ * @param <TOutput> output type
+ */
+@Unstable
+public interface VortexFunction<TInput extends Serializable, TOutput extends 
Serializable> extends Serializable {
+  /**
+   * @param input of the function
+   * @return output of the function
+   * @throws Exception thrown here will bubble up in VortexFuture#get as 
ExecutionException
+   * Exception should be thrown only after all resources are released as they 
cannot be cleared by Vortex
+   * For example if threads are spawned here, shut them down before throwing 
an exception
+   */
+  TOutput call(TInput input) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
new file mode 100644
index 0000000..09c9caf
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.api;
+
+import org.apache.reef.annotations.Unstable;
+
+import java.util.concurrent.*;
+
+/**
+ * The interface between user code and submitted task.
+ * TODO[REEF-505]: Callback features for VortexFuture.
+ */
+@Unstable
+public final class VortexFuture<TOutput> implements Future<TOutput> {
+  private TOutput userResult;
+  private Exception userException;
+  private final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+  /**
+   * TODO[REEF-502]: Support Vortex Tasklet(s) cancellation by user.
+   */
+  @Override
+  public boolean cancel(final boolean mayInterruptIfRunning) {
+    throw new UnsupportedOperationException("Cancel not yet supported");
+  }
+
+  /**
+   * TODO[REEF-502]: Support Vortex Tasklet(s) cancellation by user.
+   */
+  @Override
+  public boolean isCancelled() {
+    throw new UnsupportedOperationException("Cancel not yet supported");
+  }
+
+  /**
+   * @return true it the task completed, false if not.
+   */
+  @Override
+  public boolean isDone() {
+    return countDownLatch.getCount() == 0;
+  }
+
+  /**
+   * Infinitely wait for the result of the task.
+   */
+  @Override
+  public TOutput get() throws InterruptedException, ExecutionException {
+    countDownLatch.await();
+    if (userResult != null) {
+      return userResult;
+    } else {
+      assert(userException != null);
+      throw new ExecutionException(userException);
+    }
+  }
+
+  /**
+   * Wait a certain period of time for the result of the task.
+   */
+  @Override
+  public TOutput get(final long timeout, final TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    if (!countDownLatch.await(timeout, unit)) {
+      throw new TimeoutException();
+    }
+
+    if (userResult != null) {
+      return userResult;
+    } else {
+      assert(userException != null);
+      throw new ExecutionException(userException);
+    }
+  }
+
+  /**
+   * Called by VortexMaster to let the user know that the task completed.
+   */
+  public void completed(final TOutput result) {
+    this.userResult = result;
+    this.countDownLatch.countDown();
+  }
+
+  /**
+   * Called by VortexMaster to let the user know that the task threw an 
exception.
+   */
+  public void threwException(final Exception exception) {
+    this.userException = exception;
+    this.countDownLatch.countDown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexStart.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexStart.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexStart.java
new file mode 100644
index 0000000..50f9308
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexStart.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.api;
+
+import org.apache.reef.annotations.Unstable;
+
+/**
+ * The starting point of Vortex user thread.
+ */
+@Unstable
+public interface VortexStart {
+  /**
+   * Implement this method, using VortexThreadPool to run jobs on Vortex.
+   * Your implementation of this interface will be instantiated by Tang
+   * and this method will be called upon the start of REEF Driver.
+   */
+  void start(final VortexThreadPool vortexThreadPool);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
new file mode 100644
index 0000000..25c5c90
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.api;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.vortex.driver.VortexMaster;
+
+import javax.inject.Inject;
+import java.io.Serializable;
+
+/**
+ * Distributed thread pool.
+ */
+@Unstable
+public final class VortexThreadPool {
+  private final VortexMaster vortexMaster;
+
+  @Inject
+  private VortexThreadPool(final VortexMaster vortexMaster) {
+    this.vortexMaster = vortexMaster;
+  }
+
+  /**
+   * @param function to run on Vortex
+   * @param input of the function
+   * @param <TInput> input type
+   * @param <TOutput> output type
+   * @return VortexFuture for tracking execution progress
+   */
+  public <TInput extends Serializable, TOutput extends Serializable> 
VortexFuture<TOutput>
+      submit(final VortexFunction<TInput, TOutput> function, final TInput 
input) {
+    return vortexMaster.enqueueTasklet(function, input);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/package-info.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/package-info.java
new file mode 100644
index 0000000..9a7633e
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Vortex user api for running jobs on Vortex.
+ */
+package org.apache.reef.vortex.api;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
new file mode 100644
index 0000000..2c60c8a
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.vortex.api.VortexFunction;
+
+import java.io.Serializable;
+
+/**
+ * Request to execute a tasklet.
+ */
+@Unstable
+public final class TaskletExecutionRequest<TInput extends Serializable, 
TOutput extends Serializable>
+    implements VortexRequest {
+  private final int taskletId;
+  private final VortexFunction<TInput, TOutput> userFunction;
+  private final TInput input;
+
+  /**
+   * @return the type of this VortexRequest.
+   */
+  @Override
+  public RequestType getType() {
+    return RequestType.ExecuteTasklet;
+  }
+
+  /**
+   * Vortex Master -> Vortex Worker request to execute a tasklet.
+   */
+  public TaskletExecutionRequest(final int taskletId,
+                                 final VortexFunction<TInput, TOutput> 
userFunction,
+                                 final TInput input) {
+    this.taskletId = taskletId;
+    this.userFunction = userFunction;
+    this.input = input;
+  }
+
+  /**
+   * Execute the function using the input.
+   */
+  public TOutput execute() throws Exception {
+    return userFunction.call(input);
+  }
+
+  /**
+   * Get id of the tasklet.
+   */
+  public int getTaskletId() {
+    return taskletId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
new file mode 100644
index 0000000..dc847e7
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+
+/**
+ * Report of a tasklet exception.
+ */
+@Unstable
+public final class TaskletFailureReport implements WorkerReport {
+  private final int taskletId;
+  private final Exception exception;
+
+  /**
+   * @param taskletId of the failed tasklet.
+   * @param exception that caused the tasklet failure.
+   */
+  public TaskletFailureReport(final int taskletId, final Exception exception) {
+    this.taskletId = taskletId;
+    this.exception = exception;
+  }
+
+  /**
+   * @return the type of this WorkerReport.
+   */
+  @Override
+  public WorkerReportType getType() {
+    return WorkerReportType.TaskletFailure;
+  }
+
+  /**
+   * @return the id of the tasklet.
+   */
+  public int getTaskletId() {
+    return taskletId;
+  }
+
+  /**
+   * @return the exception that caused the tasklet failure.
+   */
+  public Exception getException() {
+    return exception;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
new file mode 100644
index 0000000..0a6fab3
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+
+import java.io.Serializable;
+
+/**
+ * Report of a tasklet execution result.
+ */
+@Unstable
+public final class TaskletResultReport<TOutput extends Serializable> 
implements WorkerReport {
+  private final int taskletId;
+  private final TOutput result;
+
+  /**
+   * @param taskletId of the tasklet.
+   * @param result of the tasklet execution.
+   */
+  public TaskletResultReport(final int taskletId, final TOutput result) {
+    this.taskletId = taskletId;
+    this.result = result;
+  }
+
+  /**
+   * @return the type of this WorkerReport.
+   */
+  @Override
+  public WorkerReportType getType() {
+    return WorkerReportType.TaskletResult;
+  }
+
+  /**
+   * @return the id of the tasklet.
+   */
+  public int getTaskletId() {
+    return taskletId;
+  }
+
+  /**
+   * @return the result of the tasklet execution.
+   */
+  public TOutput getResult() {
+    return result;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
new file mode 100644
index 0000000..967ab98
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+
+import java.io.Serializable;
+
+/**
+ * Master -> Worker protocol.
+ */
+@Unstable
+public interface VortexRequest extends Serializable {
+  /**
+   * Type of Request.
+   */
+  enum RequestType {
+    ExecuteTasklet
+  }
+
+  /**
+   * @return the type of this VortexRequest.
+   */
+  RequestType getType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
new file mode 100644
index 0000000..4444f3c
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+
+import java.io.Serializable;
+
+/**
+ * Worker -> Master protocol.
+ */
+@Unstable
+public interface WorkerReport extends Serializable {
+  /**
+   * Type of WorkerReport.
+   */
+  enum WorkerReportType {
+    TaskletResult,
+    TaskletFailure
+  }
+
+  /**
+   * @return the type of this WorkerReport.
+   */
+  WorkerReportType getType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java
new file mode 100644
index 0000000..35cf59e
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Vortex Code used both in Vortex Driver and Vortex Evaluator.
+ */
+package org.apache.reef.vortex.common;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
new file mode 100644
index 0000000..f1a55b7
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import net.jcip.annotations.ThreadSafe;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.vortex.api.VortexFunction;
+import org.apache.reef.vortex.api.VortexFuture;
+
+import javax.inject.Inject;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Default implementation of VortexMaster.
+ * Uses two thread-safe data structures(pendingTasklets, runningWorkers) in 
implementing VortexMaster interface.
+ */
+@ThreadSafe
+@DriverSide
+final class DefaultVortexMaster implements VortexMaster {
+  private final AtomicInteger taskletIdCounter = new AtomicInteger();
+  private final RunningWorkers runningWorkers;
+  private final PendingTasklets pendingTasklets;
+
+  /**
+   * @param runningWorkers for managing all running workers.
+   */
+  @Inject
+  DefaultVortexMaster(final RunningWorkers runningWorkers,
+                      final PendingTasklets pendingTasklets) {
+    this.runningWorkers = runningWorkers;
+    this.pendingTasklets = pendingTasklets;
+  }
+
+  /**
+   * Add a new tasklet to pendingTasklets.
+   */
+  @Override
+  public <TInput extends Serializable, TOutput extends Serializable> 
VortexFuture<TOutput>
+      enqueueTasklet(final VortexFunction<TInput, TOutput> function, final 
TInput input) {
+    // TODO[REEF-500]: Simple duplicate Vortex Tasklet launch.
+    final VortexFuture<TOutput> vortexFuture = new VortexFuture<>();
+    this.pendingTasklets.addLast(new 
Tasklet<>(taskletIdCounter.getAndIncrement(), function, input, vortexFuture));
+    return vortexFuture;
+  }
+
+  /**
+   * Add a new worker to runningWorkers.
+   */
+  @Override
+  public void workerAllocated(final VortexWorkerManager vortexWorkerManager) {
+    runningWorkers.addWorker(vortexWorkerManager);
+  }
+
+  /**
+   * Remove the worker from runningWorkers and add back the lost tasklets to 
pendingTasklets.
+   */
+  @Override
+  public void workerPreempted(final String id) {
+    final Collection<Tasklet> preemptedTasklets = 
runningWorkers.removeWorker(id);
+    for (final Tasklet tasklet : preemptedTasklets) {
+      pendingTasklets.addFirst(tasklet);
+    }
+  }
+
+  /**
+   * Notify task completion to runningWorkers.
+   */
+  @Override
+  public void taskletCompleted(final String workerId,
+                               final int taskletId,
+                               final Serializable result) {
+    runningWorkers.completeTasklet(workerId, taskletId, result);
+  }
+
+  /**
+   * Notify task failure to runningWorkers.
+   */
+  @Override
+  public void taskletErrored(final String workerId, final int taskletId, final 
Exception exception) {
+    runningWorkers.errorTasklet(workerId, taskletId, exception);
+  }
+
+  /**
+   * Terminate the job.
+   */
+  @Override
+  public void terminate() {
+    runningWorkers.terminate();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java
new file mode 100644
index 0000000..6a0b8b0
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Pending Tasklet Scheduler.
+ */
+@DriverSide
+final class PendingTaskletScheduler implements EventHandler<Integer> {
+  private static final Logger LOG = 
Logger.getLogger(PendingTaskletScheduler.class.getName());
+
+  private final RunningWorkers runningWorkers;
+  private final PendingTasklets pendingTasklets;
+
+  @Inject
+  private PendingTaskletScheduler(final RunningWorkers runningWorkers,
+                                  final PendingTasklets pendingTasklets) {
+    this.runningWorkers = runningWorkers;
+    this.pendingTasklets = pendingTasklets;
+  }
+
+  /**
+   * Repeatedly take a tasklet from the pending queue and schedule/launch it 
via RunningWorkers.
+   */
+  @Override
+  public void onNext(final Integer integer) {
+    while (!runningWorkers.isTerminated()) {
+      try {
+        final Tasklet tasklet = pendingTasklets.takeFirst(); // blocks when no 
tasklet exists
+        runningWorkers.launchTasklet(tasklet); // blocks when no worker exists
+      } catch (InterruptedException e) {
+        LOG.log(Level.INFO, "Interrupted upon termination");
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java
new file mode 100644
index 0000000..eac4fc6
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import net.jcip.annotations.ThreadSafe;
+import org.apache.reef.annotations.audience.DriverSide;
+
+import javax.inject.Inject;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * Keep tracks of all pending tasklets.
+ */
+@ThreadSafe
+@DriverSide
+final class PendingTasklets {
+  private final BlockingDeque<Tasklet> pendingTasklets = new 
LinkedBlockingDeque<>();
+
+  @Inject
+  PendingTasklets() {
+  }
+
+  void addLast(final Tasklet tasklet) {
+    pendingTasklets.addLast(tasklet);
+  }
+
+  void addFirst(final Tasklet tasklet) {
+    pendingTasklets.addFirst(tasklet);
+  }
+
+  Tasklet takeFirst() throws InterruptedException {
+    return pendingTasklets.takeFirst();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
new file mode 100644
index 0000000..f712786
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import net.jcip.annotations.ThreadSafe;
+import org.apache.reef.annotations.audience.DriverSide;
+
+import javax.inject.Inject;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Keeps track of all running VortexWorkers and Tasklets.
+ * Upon Tasklet launch request, randomly schedules it to a VortexWorkerManager.
+ */
+@ThreadSafe
+@DriverSide
+final class RunningWorkers {
+  // RunningWorkers and its locks
+  private final HashMap<String, VortexWorkerManager> runningWorkers = new 
HashMap<>(); // Running workers/tasklets
+  private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+  private final Lock readLock = rwLock.readLock(); // Need to acquire this to 
launch/complete tasklets
+  private final Lock writeLock = rwLock.writeLock(); // Need to acquire this 
to add/remove worker
+  private final Condition noRunningWorker = writeLock.newCondition(); // When 
there's no running worker
+
+  // To keep track of workers that are preempted before acknowledged
+  private final Set<String> removedBeforeAddedWorkers = new HashSet<>();
+
+  // Terminated
+  private volatile boolean terminated = false;
+
+  /**
+   * RunningWorkers constructor.
+   */
+  @Inject
+  RunningWorkers() {
+  }
+
+  /**
+   * Concurrency: Called by multiple threads.
+   * Parameter: Called exactly once per vortexWorkerManager.
+   */
+  void addWorker(final VortexWorkerManager vortexWorkerManager) {
+    if (!terminated) {
+      writeLock.lock();
+      try {
+        if (!terminated) { // Make sure it is really terminated
+          if 
(!removedBeforeAddedWorkers.contains(vortexWorkerManager.getId())) {
+            this.runningWorkers.put(vortexWorkerManager.getId(), 
vortexWorkerManager);
+
+            // Notify (possibly) waiting scheduler
+            if (runningWorkers.size() == 1) {
+              noRunningWorker.signalAll();
+            }
+          }
+          return;
+        }
+      } finally {
+        writeLock.unlock();
+      }
+    }
+
+    // Terminate the worker
+    vortexWorkerManager.terminate();
+  }
+
+  /**
+   * Concurrency: Called by multiple threads.
+   * Parameter: Called exactly once per id.
+   */
+  Collection<Tasklet> removeWorker(final String id) {
+    if (!terminated) {
+      writeLock.lock();
+      try {
+        if (!terminated) { // Make sure it is really terminated
+          final VortexWorkerManager vortexWorkerManager = 
this.runningWorkers.remove(id);
+          if (vortexWorkerManager != null) {
+            return vortexWorkerManager.removed();
+          } else {
+            // Called before addWorker (e.g. RM preempted the resource before 
the Evaluator started)
+            removedBeforeAddedWorkers.add(id);
+            return new ArrayList<>(0);
+          }
+        }
+      } finally {
+        writeLock.unlock();
+      }
+    }
+
+    // No need to return anything since it is terminated
+    return new ArrayList<>(0);
+  }
+
+  /**
+   * Concurrency: Called by single scheduler thread.
+   * Parameter: Same tasklet can be launched multiple times.
+   */
+  void launchTasklet(final Tasklet tasklet) {
+    if (!terminated) {
+      readLock.lock();
+      try {
+        if (!terminated) { // Make sure it is really terminated
+          // Wait until there is a running worker
+          if (runningWorkers.isEmpty()) {
+            readLock.unlock();
+            writeLock.lock();
+            try {
+              while (runningWorkers.isEmpty()) {
+                try {
+                  noRunningWorker.await();
+                } catch (InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+              }
+              readLock.lock();
+            } finally {
+              writeLock.unlock();
+            }
+          }
+
+          final VortexWorkerManager vortexWorkerManager = 
randomlyChooseWorker();
+          vortexWorkerManager.launchTasklet(tasklet);
+          return;
+        }
+      } finally {
+        readLock.unlock();
+      }
+    }
+  }
+
+  /**
+   * Concurrency: Called by multiple threads.
+   * Parameter: Same arguments can come in multiple times.
+   * (e.g. preemption message coming before tasklet completion message 
multiple times)
+   */
+  void completeTasklet(final String workerId,
+                              final int taskletId,
+                              final Serializable result) {
+    if (!terminated) {
+      readLock.lock();
+      try {
+        if (!terminated) { // Make sure it is really terminated
+          if (runningWorkers.containsKey(workerId)) { // Preemption can come 
before
+            this.runningWorkers.get(workerId).taskletCompleted(taskletId, 
result);
+          }
+          return;
+        }
+      } finally {
+        readLock.unlock();
+      }
+    }
+  }
+
+  /**
+   * Concurrency: Called by multiple threads.
+   * Parameter: Same arguments can come in multiple times.
+   * (e.g. preemption message coming before tasklet error message multiple 
times)
+   */
+  void errorTasklet(final String workerId,
+                           final int taskletId,
+                           final Exception exception) {
+    if (!terminated) {
+      readLock.lock();
+      try {
+        if (!terminated) { // Make sure it is really terminated
+          if (runningWorkers.containsKey(workerId)) { // Preemption can come 
before
+            this.runningWorkers.get(workerId).taskletThrewException(taskletId, 
exception);
+          }
+          return;
+        }
+      } finally {
+        readLock.unlock();
+      }
+    }
+  }
+
+  void terminate() {
+    if (!terminated) {
+      writeLock.lock();
+      try {
+        if (!terminated) { // Make sure it is really terminated
+          terminated = true;
+          for (final VortexWorkerManager vortexWorkerManager : 
runningWorkers.values()) {
+            vortexWorkerManager.terminate();
+          }
+          runningWorkers.clear();
+          return;
+        }
+      } finally {
+        writeLock.unlock();
+      }
+    }
+    throw new RuntimeException("Attempting to terminate an already terminated 
RunningWorkers");
+  }
+
+  boolean isTerminated() {
+    return terminated;
+  }
+
+  private VortexWorkerManager randomlyChooseWorker() {
+    final Collection<VortexWorkerManager> workers = runningWorkers.values();
+    final int index = new Random().nextInt(workers.size());
+    int i = 0;
+    for (final VortexWorkerManager vortexWorkerManager : workers) {
+      if (i == index) {
+        return vortexWorkerManager;
+      } else {
+        i++;
+      }
+    }
+    throw new RuntimeException("Bad Index");
+  }
+
+  ///////////////////////////////////////// For Tests Only
+
+  /**
+   * For unit tests to check whether the worker is running.
+   */
+  boolean isWorkerRunning(final String workerId) {
+    return runningWorkers.containsKey(workerId);
+  }
+
+  /**
+   * For unit tests to see where a tasklet is scheduled to.
+   * @param taskletId id of the tasklet in question
+   * @return id of the worker (null if the tasklet was not scheduled to any 
worker)
+   */
+  String getWhereTaskletWasScheduledTo(final int taskletId) {
+    for (final Map.Entry<String, VortexWorkerManager> entry : 
runningWorkers.entrySet()) {
+      final String workerId = entry.getKey();
+      final VortexWorkerManager vortexWorkerManager = entry.getValue();
+      if (vortexWorkerManager.containsTasklet(taskletId)) {
+        return workerId;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
new file mode 100644
index 0000000..5d64c57
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.vortex.api.VortexFunction;
+import org.apache.reef.vortex.api.VortexFuture;
+
+import java.io.Serializable;
+
+/**
+ * Representation of user task in Driver.
+ */
+@DriverSide
+class Tasklet<TInput extends Serializable, TOutput extends Serializable> 
implements Serializable {
+  private final int taskletId;
+  private final VortexFunction<TInput, TOutput> userTask;
+  private final TInput input;
+  private final VortexFuture<TOutput> vortexFuture;
+
+  Tasklet(final int taskletId,
+          final VortexFunction<TInput, TOutput> userTask,
+          final TInput input,
+          final VortexFuture<TOutput> vortexFuture) {
+    this.taskletId = taskletId;
+    this.userTask = userTask;
+    this.input = input;
+    this.vortexFuture = vortexFuture;
+  }
+
+  /**
+   * @return id of the tasklet
+   */
+  int getId() {
+    return taskletId;
+  }
+
+  /**
+   * @return the input of the tasklet
+   */
+  TInput getInput() {
+    return input;
+  }
+
+  /**
+   * @return the user function of the tasklet
+   */
+  VortexFunction<TInput, TOutput> getUserFunction() {
+    return userTask;
+  }
+
+  /**
+   * Called by VortexMaster to let the user know that the task completed.
+   */
+  void completed(final TOutput result) {
+    vortexFuture.completed(result);
+  }
+
+  /**
+   * Called by VortexMaster to let the user know that the task threw an 
exception.
+   */
+  void threwException(final Exception exception) {
+    vortexFuture.threwException(exception);
+  }
+
+  /**
+   * For tests.
+   */
+  boolean isCompleted() {
+    return vortexFuture.isDone();
+  }
+
+  /**
+   * @return description of the tasklet in string.
+   */
+  @Override
+  public String toString() {
+    return "Tasklet: " + taskletId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java
new file mode 100644
index 0000000..a7b9106
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.vortex.api.VortexStart;
+
+/**
+ * Helper class for building a configuration for Vortex.
+ */
+@Unstable
+public final class VortexConfHelper {
+  private VortexConfHelper() {
+  }
+
+  private static final int DEFAULT_NUM_OF_VORTEX_START_THERAD = 1;
+
+  /**
+   * @return Configuration for Vortex job.
+   */
+  public static Configuration getVortexConf(final String jobName,
+                                            final Class<? extends VortexStart> 
vortexStart,
+                                            final int numOfWorkers,
+                                            final int workerMemory,
+                                            final int workerCores) {
+    final Configuration vortexDriverConf = DriverConfiguration.CONF
+        .set(DriverConfiguration.GLOBAL_LIBRARIES, 
EnvironmentUtils.getClassLocation(VortexDriver.class))
+        .set(DriverConfiguration.ON_DRIVER_STARTED, 
VortexDriver.StartHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, 
VortexDriver.AllocatedEvaluatorHandler.class)
+        .set(DriverConfiguration.ON_TASK_RUNNING, 
VortexDriver.RunningTaskHandler.class)
+        .set(DriverConfiguration.ON_TASK_MESSAGE, 
VortexDriver.TaskMessageHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_FAILED, 
VortexDriver.FailedEvaluatorHandler.class)
+        .set(DriverConfiguration.DRIVER_IDENTIFIER, jobName)
+        .build();
+
+    final Configuration vortexMasterConf = VortexMasterConf.CONF
+        .set(VortexMasterConf.WORKER_NUM, numOfWorkers)
+        .set(VortexMasterConf.WORKER_MEM, workerMemory)
+        .set(VortexMasterConf.WORKER_CORES, workerCores)
+        .set(VortexMasterConf.VORTEX_START, vortexStart)
+        .set(VortexMasterConf.NUM_OF_VORTEX_START_THERAD, 
DEFAULT_NUM_OF_VORTEX_START_THERAD) // fixed to 1 for now
+        .build();
+
+    return Configurations.merge(vortexDriverConf, vortexMasterConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
new file mode 100644
index 0000000..4dedb1d
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.evaluator.*;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.driver.task.TaskMessage;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.vortex.api.VortexStart;
+import org.apache.reef.vortex.common.TaskletFailureReport;
+import org.apache.reef.vortex.common.TaskletResultReport;
+import org.apache.reef.vortex.common.WorkerReport;
+import org.apache.reef.vortex.evaluator.VortexWorker;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.SingleThreadStage;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * REEF Driver for Vortex.
+ */
+@Unit
+@DriverSide
+final class VortexDriver {
+  private static final Logger LOG = 
Logger.getLogger(VortexDriver.class.getName());
+  private static final int MAX_NUM_OF_FAILURES = 5;
+  private static final int SCHEDULER_EVENT = 0; // Dummy number to comply with 
onNext() interface
+
+  private final AtomicInteger numberOfFailures = new AtomicInteger(0);
+  private final EvaluatorRequestor evaluatorRequestor; // for requesting 
resources
+  private final VortexMaster vortexMaster; // Vortex Master
+  private final VortexRequestor vortexRequestor; // For sending Commands to 
remote workers
+
+  // Resource configuration for single thread pool
+  private final int evalMem;
+  private final int evalNum;
+  private final int evalCores;
+
+  private final EStage<VortexStart> vortexStartEStage;
+  private final VortexStart vortexStart;
+  private final EStage<Integer> pendingTaskletSchedulerEStage;
+
+  @Inject
+  private VortexDriver(final EvaluatorRequestor evaluatorRequestor,
+                       final VortexRequestor vortexRequestor,
+                       final VortexMaster vortexMaster,
+                       final EStage<VortexStart> vortexStartEStage,
+                       final VortexStart vortexStart,
+                       final PendingTaskletScheduler pendingTaskletScheduler,
+                       @Parameter(VortexMasterConf.WorkerMem.class) final int 
workerMem,
+                       @Parameter(VortexMasterConf.WorkerNum.class) final int 
workerNum,
+                       @Parameter(VortexMasterConf.WorkerCores.class) final 
int workerCores) {
+    this.vortexStartEStage = vortexStartEStage;
+    this.vortexStart = vortexStart;
+    this.pendingTaskletSchedulerEStage = new 
SingleThreadStage<>(pendingTaskletScheduler, 1);
+    this.evaluatorRequestor = evaluatorRequestor;
+    this.vortexMaster = vortexMaster;
+    this.vortexRequestor = vortexRequestor;
+    this.evalMem = workerMem;
+    this.evalNum = workerNum;
+    this.evalCores = workerCores;
+  }
+
+  /**
+   * Driver started.
+   */
+  final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      // Initial Evaluator Request
+      evaluatorRequestor.submit(EvaluatorRequest.newBuilder()
+          .setNumber(evalNum)
+          .setMemory(evalMem)
+          .setNumberOfCores(evalCores)
+          .build());
+
+      // Run Vortex Start
+      vortexStartEStage.onNext(vortexStart);
+
+      // Run Scheduler
+      pendingTaskletSchedulerEStage.onNext(SCHEDULER_EVENT);
+    }
+  }
+
+  /**
+   * Container allocated.
+   */
+  final class AllocatedEvaluatorHandler implements 
EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+      LOG.log(Level.INFO, "Container allocated");
+      final String workerId = allocatedEvaluator.getId() + "_vortex_worker";
+
+      final Configuration workerConfiguration = VortexWorkerConf.CONF
+          .set(VortexWorkerConf.NUM_OF_THREADS, evalCores) // NUM_OF_THREADS = 
evalCores
+          .build();
+
+      final Configuration taskConfiguration = TaskConfiguration.CONF
+          .set(TaskConfiguration.IDENTIFIER, workerId)
+          .set(TaskConfiguration.TASK, VortexWorker.class)
+          .set(TaskConfiguration.ON_SEND_MESSAGE, VortexWorker.class)
+          .set(TaskConfiguration.ON_MESSAGE, 
VortexWorker.DriverMessageHandler.class)
+          .set(TaskConfiguration.ON_CLOSE, VortexWorker.TaskCloseHandler.class)
+          .build();
+
+      allocatedEvaluator.submitTask(Configurations.merge(workerConfiguration, 
taskConfiguration));
+    }
+  }
+
+  /**
+   * Evaluator up and running.
+   */
+  final class RunningTaskHandler implements EventHandler<RunningTask> {
+    @Override
+    public void onNext(final RunningTask reefTask) {
+      LOG.log(Level.INFO, "Worker up and running");
+      vortexMaster.workerAllocated(new VortexWorkerManager(vortexRequestor, 
reefTask));
+    }
+  }
+
+  /**
+   * Message received.
+   */
+  final class TaskMessageHandler implements EventHandler<TaskMessage> {
+    @Override
+    public void onNext(final TaskMessage taskMessage) {
+      final String workerId = taskMessage.getId();
+      final WorkerReport workerReport= 
(WorkerReport)SerializationUtils.deserialize(taskMessage.get());
+      switch (workerReport.getType()) {
+      case TaskletResult:
+        final TaskletResultReport taskletResultReport = 
(TaskletResultReport)workerReport;
+        vortexMaster.taskletCompleted(workerId, 
taskletResultReport.getTaskletId(), taskletResultReport.getResult());
+        break;
+      case TaskletFailure:
+        final TaskletFailureReport taskletFailureReport = 
(TaskletFailureReport)workerReport;
+        vortexMaster.taskletErrored(workerId, 
taskletFailureReport.getTaskletId(), taskletFailureReport.getException());
+        break;
+      default:
+        throw new RuntimeException("Unknown Report");
+      }
+    }
+  }
+
+  /**
+   * Evaluator preempted.
+   * TODO[REEF-501]: Distinguish different types of FailedEvaluator in Vortex.
+   */
+  final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
+    @Override
+    public void onNext(final FailedEvaluator failedEvaluator) {
+      LOG.log(Level.INFO, "Evaluator preempted");
+      if (numberOfFailures.incrementAndGet() >= MAX_NUM_OF_FAILURES) {
+        throw new RuntimeException("Exceeded max number of failures");
+      } else {
+        // We request a new evaluator to take the place of the preempted one
+        evaluatorRequestor.submit(EvaluatorRequest.newBuilder()
+            .setNumber(1)
+            .setMemory(evalMem)
+            .setNumberOfCores(evalCores)
+            .build());
+
+        
vortexMaster.workerPreempted(failedEvaluator.getFailedTask().get().getId());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java
new file mode 100644
index 0000000..b288824
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.vortex.api.VortexStart;
+
+/**
+ * Launches a Vortex Job.
+ */
+@Unstable
+public final class VortexLauncher {
+  private VortexLauncher() {
+  }
+
+  private static final int MAX_NUMBER_OF_EVALUATORS = 10;
+
+  /**
+   * Launch a Vortex job using the local runtime.
+   */
+  public static LauncherStatus launchLocal(final String jobName,
+                                           final Class<? extends VortexStart> 
vortexUserCode,
+                                           final int numOfWorkers,
+                                           final int workerMemory,
+                                           final int workerCores) {
+    final Configuration runtimeConf = LocalRuntimeConfiguration.CONF
+        .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, 
MAX_NUMBER_OF_EVALUATORS)
+        .build();
+    final Configuration vortexConf =
+        VortexConfHelper.getVortexConf(jobName, vortexUserCode, numOfWorkers, 
workerMemory, workerCores);
+    return launch(runtimeConf, vortexConf);
+  }
+
+  private static LauncherStatus launch(final Configuration runtimeConf, final 
Configuration vortexConf) {
+    try {
+      return DriverLauncher.getLauncher(runtimeConf).run(vortexConf);
+    } catch (InjectionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
new file mode 100644
index 0000000..c9da30d
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.vortex.api.VortexFunction;
+import org.apache.reef.vortex.api.VortexFuture;
+
+import java.io.Serializable;
+
+/**
+ * The heart of Vortex.
+ * Processes various tasklet related events/requests coming from different 
components of the system.
+ */
+@Unstable
+@DriverSide
+@DefaultImplementation(DefaultVortexMaster.class)
+public interface VortexMaster {
+  /**
+   * Submit a new Tasklet to be run sometime in the future.
+   */
+  <TInput extends Serializable, TOutput extends Serializable> 
VortexFuture<TOutput>
+      enqueueTasklet(final VortexFunction<TInput, TOutput> vortexFunction, 
final TInput input);
+
+  /**
+   * Call this when a new worker is up and running.
+   */
+  void workerAllocated(final VortexWorkerManager vortexWorkerManager);
+
+  /**
+   * Call this when a worker is preempted.
+   */
+  void workerPreempted(final String id);
+
+  /**
+   * Call this when a Tasklet is completed.
+   */
+  void taskletCompleted(final String workerId, final int taskletId, final 
Serializable result);
+
+  /**
+   * Call this when a Tasklet errored.
+   */
+  void taskletErrored(final String workerId, final int taskletId, final 
Exception exception);
+
+  /**
+   * Release all resources and shut down.
+   */
+  void terminate();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java
new file mode 100644
index 0000000..41b7ab2
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.RequiredImpl;
+import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.vortex.api.VortexStart;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.StageConfiguration;
+import org.apache.reef.wake.impl.ThreadPoolStage;
+
+/**
+ * Vortex Master configuration.
+ */
+@Unstable
+@DriverSide
+public final class VortexMasterConf extends ConfigurationModuleBuilder {
+  /**
+   * Number of Workers.
+   */
+  @NamedParameter(doc = "Number of Workers")
+  final class WorkerNum implements Name<Integer> {
+  }
+
+  /**
+   * Worker Memory.
+   */
+  @NamedParameter(doc = "Worker Memory")
+  final class WorkerMem implements Name<Integer> {
+  }
+
+  /**
+   * Worker Cores.
+   */
+  @NamedParameter(doc = "Worker Cores")
+  final class WorkerCores implements Name<Integer> {
+  }
+
+  /**
+   * Number of Workers.
+   */
+  public static final RequiredParameter<Integer> WORKER_NUM = new 
RequiredParameter<>();
+
+  /**
+   * Worker Memory.
+   */
+  public static final RequiredParameter<Integer> WORKER_MEM = new 
RequiredParameter<>();
+
+  /**
+   * Worker Cores.
+   */
+  public static final RequiredParameter<Integer> WORKER_CORES = new 
RequiredParameter<>();
+
+  /**
+   * Vortex Start.
+   */
+  public static final RequiredImpl<VortexStart> VORTEX_START = new 
RequiredImpl<>();
+
+  /**
+   * Number of Vortex Start threads.
+   */
+  public static final RequiredParameter<Integer> NUM_OF_VORTEX_START_THERAD = 
new RequiredParameter<>();
+
+  /**
+   * Vortex Master configuration.
+   */
+  public static final ConfigurationModule CONF = new VortexMasterConf()
+      .bindNamedParameter(WorkerNum.class, WORKER_NUM)
+      .bindNamedParameter(WorkerMem.class, WORKER_MEM)
+      .bindNamedParameter(WorkerCores.class, WORKER_CORES)
+      .bindImplementation(VortexStart.class, VORTEX_START)
+      .bindNamedParameter(StageConfiguration.NumberOfThreads.class, 
NUM_OF_VORTEX_START_THERAD)
+      .bindNamedParameter(StageConfiguration.StageHandler.class, 
VortexStartExecutor.class)
+      .bindImplementation(EStage.class, ThreadPoolStage.class)
+      .build();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
new file mode 100644
index 0000000..2c02d6e
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.vortex.common.VortexRequest;
+
+import javax.inject.Inject;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Takes the serialization work from the scheduler thread.
+ */
+@DriverSide
+class VortexRequestor {
+  private final ExecutorService executorService = 
Executors.newCachedThreadPool();
+
+  @Inject
+  VortexRequestor() {
+  }
+
+  void send(final RunningTask reefTask, final VortexRequest vortexRequest) {
+    executorService.execute(new Runnable() {
+      @Override
+      public void run() {
+        //  Possible race condition with VortexWorkerManager#terminate is 
addressed by the global lock in VortexMaster
+        reefTask.send(SerializationUtils.serialize(vortexRequest));
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexStartExecutor.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexStartExecutor.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexStartExecutor.java
new file mode 100644
index 0000000..6a5ae4b
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexStartExecutor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.vortex.api.VortexStart;
+import org.apache.reef.vortex.api.VortexThreadPool;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Thread pool that executes VortexStart.
+ */
+@DriverSide
+final class VortexStartExecutor implements EventHandler<VortexStart> {
+  private final VortexThreadPool vortexThreadPool; // ThreadPool-like API for 
VortexStart to use
+  private final VortexMaster vortexMaster;
+
+  @Inject
+  private VortexStartExecutor(final VortexThreadPool vortexThreadPool,
+                             final VortexMaster vortexMaster) {
+    this.vortexThreadPool = vortexThreadPool;
+    this.vortexMaster = vortexMaster;
+  }
+
+  /**
+   * Execute the passed VortexStart and terminate right after.
+   * @param vortexStart to execute
+   */
+  @Override
+  public void onNext(final VortexStart vortexStart) {
+    vortexStart.start(vortexThreadPool);
+    vortexMaster.terminate();
+  }
+}
\ No newline at end of file


Reply via email to