[REEF-491] Initial Vortex implementation This addressed the issue by * Creating a Java ThreadPool-like user interface * Enabling distributed execution of user-submitted tasks * Developing a mechanism for handling of resource preemption events * Developing simple task scheduling mechanisms * Creating simple unit/integration tests to verify correctness.
JIRA: [REEF-491](https://issues.apache.org/jira/browse/REEF-491) Pull Request: Closes #310 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2c994ec9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2c994ec9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2c994ec9 Branch: refs/heads/master Commit: 2c994ec9714c2ce30f237c1b808e6335019e754b Parents: 0181e2c Author: John Yang <[email protected]> Authored: Tue Jun 16 10:46:43 2015 +0800 Committer: Byung-Gon Chun <[email protected]> Committed: Tue Aug 4 15:41:07 2015 +0900 ---------------------------------------------------------------------- lang/java/reef-applications/pom.xml | 8 - .../reef-applications/reef-vortex/README.md | 99 +++++++ lang/java/reef-applications/reef-vortex/pom.xml | 62 ++++- .../org/apache/reef/vortex/VortexDriver.java | 60 ----- .../apache/reef/vortex/api/VortexFunction.java | 43 ++++ .../apache/reef/vortex/api/VortexFuture.java | 106 ++++++++ .../org/apache/reef/vortex/api/VortexStart.java | 34 +++ .../reef/vortex/api/VortexThreadPool.java | 50 ++++ .../apache/reef/vortex/api/package-info.java | 22 ++ .../vortex/common/TaskletExecutionRequest.java | 68 +++++ .../vortex/common/TaskletFailureReport.java | 61 +++++ .../reef/vortex/common/TaskletResultReport.java | 64 +++++ .../reef/vortex/common/VortexRequest.java | 41 +++ .../apache/reef/vortex/common/WorkerReport.java | 42 +++ .../apache/reef/vortex/common/package-info.java | 22 ++ .../reef/vortex/driver/DefaultVortexMaster.java | 108 ++++++++ .../vortex/driver/PendingTaskletScheduler.java | 59 +++++ .../reef/vortex/driver/PendingTasklets.java | 51 ++++ .../reef/vortex/driver/RunningWorkers.java | 257 +++++++++++++++++++ .../org/apache/reef/vortex/driver/Tasklet.java | 96 +++++++ .../reef/vortex/driver/VortexConfHelper.java | 66 +++++ .../apache/reef/vortex/driver/VortexDriver.java | 193 ++++++++++++++ .../reef/vortex/driver/VortexLauncher.java | 62 +++++ .../apache/reef/vortex/driver/VortexMaster.java | 67 +++++ .../reef/vortex/driver/VortexMasterConf.java | 98 +++++++ .../reef/vortex/driver/VortexRequestor.java | 50 ++++ .../reef/vortex/driver/VortexStartExecutor.java | 52 ++++ .../reef/vortex/driver/VortexWorkerConf.java | 53 ++++ .../reef/vortex/driver/VortexWorkerManager.java | 95 +++++++ .../apache/reef/vortex/driver/package-info.java | 22 ++ .../reef/vortex/evaluator/VortexWorker.java | 160 ++++++++++++ .../reef/vortex/evaluator/package-info.java | 22 ++ .../reef/vortex/examples/addone/AddOne.java | 36 +++ .../vortex/examples/addone/AddOneFunction.java | 34 +++ .../vortex/examples/addone/AddOneStart.java | 67 +++++ .../vortex/examples/addone/package-info.java | 22 ++ .../reef/vortex/examples/hello/HelloVortex.java | 36 +++ .../examples/hello/HelloVortexFunction.java | 37 +++ .../vortex/examples/hello/HelloVortexStart.java | 48 ++++ .../vortex/examples/hello/package-info.java | 22 ++ .../org/apache/reef/vortex/package-info.java | 2 +- .../vortex/driver/DefaultVortexMasterTest.java | 158 ++++++++++++ .../reef/vortex/driver/RunningWorkersTest.java | 64 +++++ .../org/apache/reef/vortex/driver/TestUtil.java | 66 +++++ .../apache/reef/vortex/driver/package-info.java | 22 ++ lang/java/reef-tests/pom.xml | 5 + .../org/apache/reef/tests/AllTestsSuite.java | 4 +- .../applications/ApplicationTestSuite.java | 30 +++ .../reef/tests/applications/package-info.java | 22 ++ .../applications/vortex/VortexTestSuite.java | 30 +++ .../vortex/addone/AddOneFunction.java | 34 +++ .../applications/vortex/addone/AddOneTest.java | 63 +++++ .../vortex/addone/AddOneTestStart.java | 67 +++++ .../vortex/addone/package-info.java | 22 ++ .../tests/applications/vortex/package-info.java | 22 ++ 55 files changed, 3135 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/pom.xml b/lang/java/reef-applications/pom.xml index 828ace3..b7ddb2c 100644 --- a/lang/java/reef-applications/pom.xml +++ b/lang/java/reef-applications/pom.xml @@ -31,14 +31,6 @@ under the License. <relativePath>../../..</relativePath> </parent> - <dependencies> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>reef-common</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - <modules> <module>reef-vortex</module> </modules> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/README.md ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/README.md b/lang/java/reef-applications/reef-vortex/README.md new file mode 100644 index 0000000..0ea6518 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/README.md @@ -0,0 +1,99 @@ +Vortex +======================== +Vortex is a runtime for low-priority jobs to effectively cope +with non-cooperative(thus low-latency) preemptions with the following goals: + +* Enable high-priority jobs such as latency-sensitive online serving jobs to reclaim resources quickly +* Enable low-priority jobs such as offline batch data analytics jobs to efficiently utilize volatile resources + +Vortex is currently under [active development](https://issues.apache.org/jira/browse/REEF-364) + +Key Components +======================== +* ThreadPool API + * Users can submit custom functions and inputs to Vortex and retrieve results + * This API hides the details of distributed execution on unreliable resources(e.g. handling of preemption) +* Tasklet + * User-submitted function and input are translated into Tasklet(s) in VortexMaster + * A Tasklet is then scheduled to and executed by a VortexWorker +* Queue + * VortexMaster Queue(in REEF Driver): Keeps track of Tasklets waiting to be scheduled/enqueued to a VortexWorker Queue + * VortexWorker Queue(in REEF Evaluator): Keeps track of Tasklets waiting to be executed + +Example: Vector Calculation on Vortex +==================== +```java +/** + * User's main function. + */ +public final class AddOne { + private AddOne() { + } + + public static void main(final String[] args) { + VortexLauncher.launchLocal("Vortex_Example_AddOne", AddOneStart.class, 2, 1024, 4); + } +} +``` + +* Through `VortexLauncher` user launches a Vortex job, passing following arguments + * Name of the job + * Implementation of VortexStart(`AddOneStart`) + * Amount of resources to use + +```java +/** + * AddOne User Code Example. + */ +final class AddOneStart implements VortexStart { + @Inject + public AddOneStart() { + } + + @Override + public void start(final VortexThreadPool vortexThreadPool) { + final Vector<Integer> inputVector = new Vector<>(); + for (int i = 0; i < 1000; i++) { + inputVector.add(i); + } + + final List<VortexFuture<Integer>> futures = new ArrayList<>(); + final AddOneFunction addOneFunction = new AddOneFunction(); + for (final int i : inputVector) { + futures.add(vortexThreadPool.submit(addOneFunction, i)); + } + + final Vector<Integer> outputVector = new Vector<>(); + for (final VortexFuture<Integer> future : futures) { + try { + outputVector.add(future.get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + System.out.println("RESULT:"); + System.out.println(outputVector); + } +} +``` + +* Through `VortexThreadPool#submit`, user submits a custom function(`AddOneFunction`) and its input to be executed on Vortex runtime +* Using returned `VortexFuture`, user retrieves execution results + +```java +/** + * Outputs input + 1. + */ +final class AddOneFunction implements VortexFunction<Integer, Integer> { + @Override + public Integer call(final Integer input) throws Exception { + return input + 1; + } +} +``` +* User implementation of VortexFunction, takes an integer as the input and outputs input+1 + + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/pom.xml b/lang/java/reef-applications/reef-vortex/pom.xml index 175c4a5..2f47ab2 100644 --- a/lang/java/reef-applications/reef-vortex/pom.xml +++ b/lang/java/reef-applications/reef-vortex/pom.xml @@ -20,7 +20,7 @@ under the License. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <artifactId>reef-vortex</artifactId> + <artifactId>vortex</artifactId> <name>REEF Vortex</name> <parent> @@ -28,4 +28,64 @@ under the License. <artifactId>reef-applications</artifactId> <version>0.12.0-incubating-SNAPSHOT</version> </parent> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-runtime-local</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + <configuration> + <outputFile> + ${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar + </outputFile> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>${maven-checkstyle-plugin.version}</version> + <executions> + <execution> + <id>validate</id> + <phase>validate</phase> + <goals> + <goal>check</goal> + <goal>checkstyle</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/VortexDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/VortexDriver.java deleted file mode 100644 index d3a4c70..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/VortexDriver.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex; - -import org.apache.reef.driver.evaluator.AllocatedEvaluator; -import org.apache.reef.driver.evaluator.EvaluatorRequest; -import org.apache.reef.driver.evaluator.EvaluatorRequestor; -import org.apache.reef.tang.annotations.Unit; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.time.event.StartTime; - -import javax.inject.Inject; -import java.util.logging.Level; -import java.util.logging.Logger; - -@Unit -public final class VortexDriver { - private static final Logger LOG = Logger.getLogger(VortexDriver.class.getName()); - private final EvaluatorRequestor requestor; - - @Inject - public VortexDriver(final EvaluatorRequestor requestor) { - this.requestor = requestor; - } - - public final class StartHandler implements EventHandler<StartTime> { - @Override - public void onNext(final StartTime startTime) { - requestor.submit(EvaluatorRequest.newBuilder() - .setNumber(1) - .setMemory(64) - .setNumberOfCores(1) - .build()); - } - } - - public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { - @Override - public void onNext(final AllocatedEvaluator allocatedEvaluator) { - LOG.log(Level.INFO, "Evaluator Allocated: {0}", allocatedEvaluator); - allocatedEvaluator.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java new file mode 100644 index 0000000..96e47b6 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.api; + +import org.apache.reef.annotations.Unstable; + +import java.io.Serializable; + +/** + * Typed user function. + * Implement your functions using this interface. + * TODO[REEF-504]: Clean up Serializable in Vortex. + * + * @param <TInput> input type + * @param <TOutput> output type + */ +@Unstable +public interface VortexFunction<TInput extends Serializable, TOutput extends Serializable> extends Serializable { + /** + * @param input of the function + * @return output of the function + * @throws Exception thrown here will bubble up in VortexFuture#get as ExecutionException + * Exception should be thrown only after all resources are released as they cannot be cleared by Vortex + * For example if threads are spawned here, shut them down before throwing an exception + */ + TOutput call(TInput input) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java new file mode 100644 index 0000000..09c9caf --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.api; + +import org.apache.reef.annotations.Unstable; + +import java.util.concurrent.*; + +/** + * The interface between user code and submitted task. + * TODO[REEF-505]: Callback features for VortexFuture. + */ +@Unstable +public final class VortexFuture<TOutput> implements Future<TOutput> { + private TOutput userResult; + private Exception userException; + private final CountDownLatch countDownLatch = new CountDownLatch(1); + + /** + * TODO[REEF-502]: Support Vortex Tasklet(s) cancellation by user. + */ + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + throw new UnsupportedOperationException("Cancel not yet supported"); + } + + /** + * TODO[REEF-502]: Support Vortex Tasklet(s) cancellation by user. + */ + @Override + public boolean isCancelled() { + throw new UnsupportedOperationException("Cancel not yet supported"); + } + + /** + * @return true it the task completed, false if not. + */ + @Override + public boolean isDone() { + return countDownLatch.getCount() == 0; + } + + /** + * Infinitely wait for the result of the task. + */ + @Override + public TOutput get() throws InterruptedException, ExecutionException { + countDownLatch.await(); + if (userResult != null) { + return userResult; + } else { + assert(userException != null); + throw new ExecutionException(userException); + } + } + + /** + * Wait a certain period of time for the result of the task. + */ + @Override + public TOutput get(final long timeout, final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + if (!countDownLatch.await(timeout, unit)) { + throw new TimeoutException(); + } + + if (userResult != null) { + return userResult; + } else { + assert(userException != null); + throw new ExecutionException(userException); + } + } + + /** + * Called by VortexMaster to let the user know that the task completed. + */ + public void completed(final TOutput result) { + this.userResult = result; + this.countDownLatch.countDown(); + } + + /** + * Called by VortexMaster to let the user know that the task threw an exception. + */ + public void threwException(final Exception exception) { + this.userException = exception; + this.countDownLatch.countDown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexStart.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexStart.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexStart.java new file mode 100644 index 0000000..50f9308 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexStart.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.api; + +import org.apache.reef.annotations.Unstable; + +/** + * The starting point of Vortex user thread. + */ +@Unstable +public interface VortexStart { + /** + * Implement this method, using VortexThreadPool to run jobs on Vortex. + * Your implementation of this interface will be instantiated by Tang + * and this method will be called upon the start of REEF Driver. + */ + void start(final VortexThreadPool vortexThreadPool); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java new file mode 100644 index 0000000..25c5c90 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.api; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.vortex.driver.VortexMaster; + +import javax.inject.Inject; +import java.io.Serializable; + +/** + * Distributed thread pool. + */ +@Unstable +public final class VortexThreadPool { + private final VortexMaster vortexMaster; + + @Inject + private VortexThreadPool(final VortexMaster vortexMaster) { + this.vortexMaster = vortexMaster; + } + + /** + * @param function to run on Vortex + * @param input of the function + * @param <TInput> input type + * @param <TOutput> output type + * @return VortexFuture for tracking execution progress + */ + public <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput> + submit(final VortexFunction<TInput, TOutput> function, final TInput input) { + return vortexMaster.enqueueTasklet(function, input); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/package-info.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/package-info.java new file mode 100644 index 0000000..9a7633e --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Vortex user api for running jobs on Vortex. + */ +package org.apache.reef.vortex.api; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java new file mode 100644 index 0000000..2c60c8a --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.vortex.api.VortexFunction; + +import java.io.Serializable; + +/** + * Request to execute a tasklet. + */ +@Unstable +public final class TaskletExecutionRequest<TInput extends Serializable, TOutput extends Serializable> + implements VortexRequest { + private final int taskletId; + private final VortexFunction<TInput, TOutput> userFunction; + private final TInput input; + + /** + * @return the type of this VortexRequest. + */ + @Override + public RequestType getType() { + return RequestType.ExecuteTasklet; + } + + /** + * Vortex Master -> Vortex Worker request to execute a tasklet. + */ + public TaskletExecutionRequest(final int taskletId, + final VortexFunction<TInput, TOutput> userFunction, + final TInput input) { + this.taskletId = taskletId; + this.userFunction = userFunction; + this.input = input; + } + + /** + * Execute the function using the input. + */ + public TOutput execute() throws Exception { + return userFunction.call(input); + } + + /** + * Get id of the tasklet. + */ + public int getTaskletId() { + return taskletId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java new file mode 100644 index 0000000..dc847e7 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import org.apache.reef.annotations.Unstable; + +/** + * Report of a tasklet exception. + */ +@Unstable +public final class TaskletFailureReport implements WorkerReport { + private final int taskletId; + private final Exception exception; + + /** + * @param taskletId of the failed tasklet. + * @param exception that caused the tasklet failure. + */ + public TaskletFailureReport(final int taskletId, final Exception exception) { + this.taskletId = taskletId; + this.exception = exception; + } + + /** + * @return the type of this WorkerReport. + */ + @Override + public WorkerReportType getType() { + return WorkerReportType.TaskletFailure; + } + + /** + * @return the id of the tasklet. + */ + public int getTaskletId() { + return taskletId; + } + + /** + * @return the exception that caused the tasklet failure. + */ + public Exception getException() { + return exception; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java new file mode 100644 index 0000000..0a6fab3 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import org.apache.reef.annotations.Unstable; + +import java.io.Serializable; + +/** + * Report of a tasklet execution result. + */ +@Unstable +public final class TaskletResultReport<TOutput extends Serializable> implements WorkerReport { + private final int taskletId; + private final TOutput result; + + /** + * @param taskletId of the tasklet. + * @param result of the tasklet execution. + */ + public TaskletResultReport(final int taskletId, final TOutput result) { + this.taskletId = taskletId; + this.result = result; + } + + /** + * @return the type of this WorkerReport. + */ + @Override + public WorkerReportType getType() { + return WorkerReportType.TaskletResult; + } + + /** + * @return the id of the tasklet. + */ + public int getTaskletId() { + return taskletId; + } + + /** + * @return the result of the tasklet execution. + */ + public TOutput getResult() { + return result; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java new file mode 100644 index 0000000..967ab98 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import org.apache.reef.annotations.Unstable; + +import java.io.Serializable; + +/** + * Master -> Worker protocol. + */ +@Unstable +public interface VortexRequest extends Serializable { + /** + * Type of Request. + */ + enum RequestType { + ExecuteTasklet + } + + /** + * @return the type of this VortexRequest. + */ + RequestType getType(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java new file mode 100644 index 0000000..4444f3c --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import org.apache.reef.annotations.Unstable; + +import java.io.Serializable; + +/** + * Worker -> Master protocol. + */ +@Unstable +public interface WorkerReport extends Serializable { + /** + * Type of WorkerReport. + */ + enum WorkerReportType { + TaskletResult, + TaskletFailure + } + + /** + * @return the type of this WorkerReport. + */ + WorkerReportType getType(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java new file mode 100644 index 0000000..35cf59e --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Vortex Code used both in Vortex Driver and Vortex Evaluator. + */ +package org.apache.reef.vortex.common; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java new file mode 100644 index 0000000..f1a55b7 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import net.jcip.annotations.ThreadSafe; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.vortex.api.VortexFunction; +import org.apache.reef.vortex.api.VortexFuture; + +import javax.inject.Inject; +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Default implementation of VortexMaster. + * Uses two thread-safe data structures(pendingTasklets, runningWorkers) in implementing VortexMaster interface. + */ +@ThreadSafe +@DriverSide +final class DefaultVortexMaster implements VortexMaster { + private final AtomicInteger taskletIdCounter = new AtomicInteger(); + private final RunningWorkers runningWorkers; + private final PendingTasklets pendingTasklets; + + /** + * @param runningWorkers for managing all running workers. + */ + @Inject + DefaultVortexMaster(final RunningWorkers runningWorkers, + final PendingTasklets pendingTasklets) { + this.runningWorkers = runningWorkers; + this.pendingTasklets = pendingTasklets; + } + + /** + * Add a new tasklet to pendingTasklets. + */ + @Override + public <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput> + enqueueTasklet(final VortexFunction<TInput, TOutput> function, final TInput input) { + // TODO[REEF-500]: Simple duplicate Vortex Tasklet launch. + final VortexFuture<TOutput> vortexFuture = new VortexFuture<>(); + this.pendingTasklets.addLast(new Tasklet<>(taskletIdCounter.getAndIncrement(), function, input, vortexFuture)); + return vortexFuture; + } + + /** + * Add a new worker to runningWorkers. + */ + @Override + public void workerAllocated(final VortexWorkerManager vortexWorkerManager) { + runningWorkers.addWorker(vortexWorkerManager); + } + + /** + * Remove the worker from runningWorkers and add back the lost tasklets to pendingTasklets. + */ + @Override + public void workerPreempted(final String id) { + final Collection<Tasklet> preemptedTasklets = runningWorkers.removeWorker(id); + for (final Tasklet tasklet : preemptedTasklets) { + pendingTasklets.addFirst(tasklet); + } + } + + /** + * Notify task completion to runningWorkers. + */ + @Override + public void taskletCompleted(final String workerId, + final int taskletId, + final Serializable result) { + runningWorkers.completeTasklet(workerId, taskletId, result); + } + + /** + * Notify task failure to runningWorkers. + */ + @Override + public void taskletErrored(final String workerId, final int taskletId, final Exception exception) { + runningWorkers.errorTasklet(workerId, taskletId, exception); + } + + /** + * Terminate the job. + */ + @Override + public void terminate() { + runningWorkers.terminate(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java new file mode 100644 index 0000000..6a0b8b0 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Pending Tasklet Scheduler. + */ +@DriverSide +final class PendingTaskletScheduler implements EventHandler<Integer> { + private static final Logger LOG = Logger.getLogger(PendingTaskletScheduler.class.getName()); + + private final RunningWorkers runningWorkers; + private final PendingTasklets pendingTasklets; + + @Inject + private PendingTaskletScheduler(final RunningWorkers runningWorkers, + final PendingTasklets pendingTasklets) { + this.runningWorkers = runningWorkers; + this.pendingTasklets = pendingTasklets; + } + + /** + * Repeatedly take a tasklet from the pending queue and schedule/launch it via RunningWorkers. + */ + @Override + public void onNext(final Integer integer) { + while (!runningWorkers.isTerminated()) { + try { + final Tasklet tasklet = pendingTasklets.takeFirst(); // blocks when no tasklet exists + runningWorkers.launchTasklet(tasklet); // blocks when no worker exists + } catch (InterruptedException e) { + LOG.log(Level.INFO, "Interrupted upon termination"); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java new file mode 100644 index 0000000..eac4fc6 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import net.jcip.annotations.ThreadSafe; +import org.apache.reef.annotations.audience.DriverSide; + +import javax.inject.Inject; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * Keep tracks of all pending tasklets. + */ +@ThreadSafe +@DriverSide +final class PendingTasklets { + private final BlockingDeque<Tasklet> pendingTasklets = new LinkedBlockingDeque<>(); + + @Inject + PendingTasklets() { + } + + void addLast(final Tasklet tasklet) { + pendingTasklets.addLast(tasklet); + } + + void addFirst(final Tasklet tasklet) { + pendingTasklets.addFirst(tasklet); + } + + Tasklet takeFirst() throws InterruptedException { + return pendingTasklets.takeFirst(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java new file mode 100644 index 0000000..f712786 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import net.jcip.annotations.ThreadSafe; +import org.apache.reef.annotations.audience.DriverSide; + +import javax.inject.Inject; +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Keeps track of all running VortexWorkers and Tasklets. + * Upon Tasklet launch request, randomly schedules it to a VortexWorkerManager. + */ +@ThreadSafe +@DriverSide +final class RunningWorkers { + // RunningWorkers and its locks + private final HashMap<String, VortexWorkerManager> runningWorkers = new HashMap<>(); // Running workers/tasklets + private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); // Need to acquire this to launch/complete tasklets + private final Lock writeLock = rwLock.writeLock(); // Need to acquire this to add/remove worker + private final Condition noRunningWorker = writeLock.newCondition(); // When there's no running worker + + // To keep track of workers that are preempted before acknowledged + private final Set<String> removedBeforeAddedWorkers = new HashSet<>(); + + // Terminated + private volatile boolean terminated = false; + + /** + * RunningWorkers constructor. + */ + @Inject + RunningWorkers() { + } + + /** + * Concurrency: Called by multiple threads. + * Parameter: Called exactly once per vortexWorkerManager. + */ + void addWorker(final VortexWorkerManager vortexWorkerManager) { + if (!terminated) { + writeLock.lock(); + try { + if (!terminated) { // Make sure it is really terminated + if (!removedBeforeAddedWorkers.contains(vortexWorkerManager.getId())) { + this.runningWorkers.put(vortexWorkerManager.getId(), vortexWorkerManager); + + // Notify (possibly) waiting scheduler + if (runningWorkers.size() == 1) { + noRunningWorker.signalAll(); + } + } + return; + } + } finally { + writeLock.unlock(); + } + } + + // Terminate the worker + vortexWorkerManager.terminate(); + } + + /** + * Concurrency: Called by multiple threads. + * Parameter: Called exactly once per id. + */ + Collection<Tasklet> removeWorker(final String id) { + if (!terminated) { + writeLock.lock(); + try { + if (!terminated) { // Make sure it is really terminated + final VortexWorkerManager vortexWorkerManager = this.runningWorkers.remove(id); + if (vortexWorkerManager != null) { + return vortexWorkerManager.removed(); + } else { + // Called before addWorker (e.g. RM preempted the resource before the Evaluator started) + removedBeforeAddedWorkers.add(id); + return new ArrayList<>(0); + } + } + } finally { + writeLock.unlock(); + } + } + + // No need to return anything since it is terminated + return new ArrayList<>(0); + } + + /** + * Concurrency: Called by single scheduler thread. + * Parameter: Same tasklet can be launched multiple times. + */ + void launchTasklet(final Tasklet tasklet) { + if (!terminated) { + readLock.lock(); + try { + if (!terminated) { // Make sure it is really terminated + // Wait until there is a running worker + if (runningWorkers.isEmpty()) { + readLock.unlock(); + writeLock.lock(); + try { + while (runningWorkers.isEmpty()) { + try { + noRunningWorker.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + readLock.lock(); + } finally { + writeLock.unlock(); + } + } + + final VortexWorkerManager vortexWorkerManager = randomlyChooseWorker(); + vortexWorkerManager.launchTasklet(tasklet); + return; + } + } finally { + readLock.unlock(); + } + } + } + + /** + * Concurrency: Called by multiple threads. + * Parameter: Same arguments can come in multiple times. + * (e.g. preemption message coming before tasklet completion message multiple times) + */ + void completeTasklet(final String workerId, + final int taskletId, + final Serializable result) { + if (!terminated) { + readLock.lock(); + try { + if (!terminated) { // Make sure it is really terminated + if (runningWorkers.containsKey(workerId)) { // Preemption can come before + this.runningWorkers.get(workerId).taskletCompleted(taskletId, result); + } + return; + } + } finally { + readLock.unlock(); + } + } + } + + /** + * Concurrency: Called by multiple threads. + * Parameter: Same arguments can come in multiple times. + * (e.g. preemption message coming before tasklet error message multiple times) + */ + void errorTasklet(final String workerId, + final int taskletId, + final Exception exception) { + if (!terminated) { + readLock.lock(); + try { + if (!terminated) { // Make sure it is really terminated + if (runningWorkers.containsKey(workerId)) { // Preemption can come before + this.runningWorkers.get(workerId).taskletThrewException(taskletId, exception); + } + return; + } + } finally { + readLock.unlock(); + } + } + } + + void terminate() { + if (!terminated) { + writeLock.lock(); + try { + if (!terminated) { // Make sure it is really terminated + terminated = true; + for (final VortexWorkerManager vortexWorkerManager : runningWorkers.values()) { + vortexWorkerManager.terminate(); + } + runningWorkers.clear(); + return; + } + } finally { + writeLock.unlock(); + } + } + throw new RuntimeException("Attempting to terminate an already terminated RunningWorkers"); + } + + boolean isTerminated() { + return terminated; + } + + private VortexWorkerManager randomlyChooseWorker() { + final Collection<VortexWorkerManager> workers = runningWorkers.values(); + final int index = new Random().nextInt(workers.size()); + int i = 0; + for (final VortexWorkerManager vortexWorkerManager : workers) { + if (i == index) { + return vortexWorkerManager; + } else { + i++; + } + } + throw new RuntimeException("Bad Index"); + } + + ///////////////////////////////////////// For Tests Only + + /** + * For unit tests to check whether the worker is running. + */ + boolean isWorkerRunning(final String workerId) { + return runningWorkers.containsKey(workerId); + } + + /** + * For unit tests to see where a tasklet is scheduled to. + * @param taskletId id of the tasklet in question + * @return id of the worker (null if the tasklet was not scheduled to any worker) + */ + String getWhereTaskletWasScheduledTo(final int taskletId) { + for (final Map.Entry<String, VortexWorkerManager> entry : runningWorkers.entrySet()) { + final String workerId = entry.getKey(); + final VortexWorkerManager vortexWorkerManager = entry.getValue(); + if (vortexWorkerManager.containsTasklet(taskletId)) { + return workerId; + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java new file mode 100644 index 0000000..5d64c57 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.vortex.api.VortexFunction; +import org.apache.reef.vortex.api.VortexFuture; + +import java.io.Serializable; + +/** + * Representation of user task in Driver. + */ +@DriverSide +class Tasklet<TInput extends Serializable, TOutput extends Serializable> implements Serializable { + private final int taskletId; + private final VortexFunction<TInput, TOutput> userTask; + private final TInput input; + private final VortexFuture<TOutput> vortexFuture; + + Tasklet(final int taskletId, + final VortexFunction<TInput, TOutput> userTask, + final TInput input, + final VortexFuture<TOutput> vortexFuture) { + this.taskletId = taskletId; + this.userTask = userTask; + this.input = input; + this.vortexFuture = vortexFuture; + } + + /** + * @return id of the tasklet + */ + int getId() { + return taskletId; + } + + /** + * @return the input of the tasklet + */ + TInput getInput() { + return input; + } + + /** + * @return the user function of the tasklet + */ + VortexFunction<TInput, TOutput> getUserFunction() { + return userTask; + } + + /** + * Called by VortexMaster to let the user know that the task completed. + */ + void completed(final TOutput result) { + vortexFuture.completed(result); + } + + /** + * Called by VortexMaster to let the user know that the task threw an exception. + */ + void threwException(final Exception exception) { + vortexFuture.threwException(exception); + } + + /** + * For tests. + */ + boolean isCompleted() { + return vortexFuture.isDone(); + } + + /** + * @return description of the tasklet in string. + */ + @Override + public String toString() { + return "Tasklet: " + taskletId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java new file mode 100644 index 0000000..a7b9106 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.client.DriverConfiguration; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.util.EnvironmentUtils; +import org.apache.reef.vortex.api.VortexStart; + +/** + * Helper class for building a configuration for Vortex. + */ +@Unstable +public final class VortexConfHelper { + private VortexConfHelper() { + } + + private static final int DEFAULT_NUM_OF_VORTEX_START_THERAD = 1; + + /** + * @return Configuration for Vortex job. + */ + public static Configuration getVortexConf(final String jobName, + final Class<? extends VortexStart> vortexStart, + final int numOfWorkers, + final int workerMemory, + final int workerCores) { + final Configuration vortexDriverConf = DriverConfiguration.CONF + .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(VortexDriver.class)) + .set(DriverConfiguration.ON_DRIVER_STARTED, VortexDriver.StartHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, VortexDriver.AllocatedEvaluatorHandler.class) + .set(DriverConfiguration.ON_TASK_RUNNING, VortexDriver.RunningTaskHandler.class) + .set(DriverConfiguration.ON_TASK_MESSAGE, VortexDriver.TaskMessageHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_FAILED, VortexDriver.FailedEvaluatorHandler.class) + .set(DriverConfiguration.DRIVER_IDENTIFIER, jobName) + .build(); + + final Configuration vortexMasterConf = VortexMasterConf.CONF + .set(VortexMasterConf.WORKER_NUM, numOfWorkers) + .set(VortexMasterConf.WORKER_MEM, workerMemory) + .set(VortexMasterConf.WORKER_CORES, workerCores) + .set(VortexMasterConf.VORTEX_START, vortexStart) + .set(VortexMasterConf.NUM_OF_VORTEX_START_THERAD, DEFAULT_NUM_OF_VORTEX_START_THERAD) // fixed to 1 for now + .build(); + + return Configurations.merge(vortexDriverConf, vortexMasterConf); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java new file mode 100644 index 0000000..4dedb1d --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import org.apache.commons.lang.SerializationUtils; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.driver.evaluator.*; +import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.driver.task.TaskConfiguration; +import org.apache.reef.driver.task.TaskMessage; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.annotations.Unit; +import org.apache.reef.vortex.api.VortexStart; +import org.apache.reef.vortex.common.TaskletFailureReport; +import org.apache.reef.vortex.common.TaskletResultReport; +import org.apache.reef.vortex.common.WorkerReport; +import org.apache.reef.vortex.evaluator.VortexWorker; +import org.apache.reef.wake.EStage; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.impl.SingleThreadStage; +import org.apache.reef.wake.time.event.StartTime; + +import javax.inject.Inject; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * REEF Driver for Vortex. + */ +@Unit +@DriverSide +final class VortexDriver { + private static final Logger LOG = Logger.getLogger(VortexDriver.class.getName()); + private static final int MAX_NUM_OF_FAILURES = 5; + private static final int SCHEDULER_EVENT = 0; // Dummy number to comply with onNext() interface + + private final AtomicInteger numberOfFailures = new AtomicInteger(0); + private final EvaluatorRequestor evaluatorRequestor; // for requesting resources + private final VortexMaster vortexMaster; // Vortex Master + private final VortexRequestor vortexRequestor; // For sending Commands to remote workers + + // Resource configuration for single thread pool + private final int evalMem; + private final int evalNum; + private final int evalCores; + + private final EStage<VortexStart> vortexStartEStage; + private final VortexStart vortexStart; + private final EStage<Integer> pendingTaskletSchedulerEStage; + + @Inject + private VortexDriver(final EvaluatorRequestor evaluatorRequestor, + final VortexRequestor vortexRequestor, + final VortexMaster vortexMaster, + final EStage<VortexStart> vortexStartEStage, + final VortexStart vortexStart, + final PendingTaskletScheduler pendingTaskletScheduler, + @Parameter(VortexMasterConf.WorkerMem.class) final int workerMem, + @Parameter(VortexMasterConf.WorkerNum.class) final int workerNum, + @Parameter(VortexMasterConf.WorkerCores.class) final int workerCores) { + this.vortexStartEStage = vortexStartEStage; + this.vortexStart = vortexStart; + this.pendingTaskletSchedulerEStage = new SingleThreadStage<>(pendingTaskletScheduler, 1); + this.evaluatorRequestor = evaluatorRequestor; + this.vortexMaster = vortexMaster; + this.vortexRequestor = vortexRequestor; + this.evalMem = workerMem; + this.evalNum = workerNum; + this.evalCores = workerCores; + } + + /** + * Driver started. + */ + final class StartHandler implements EventHandler<StartTime> { + @Override + public void onNext(final StartTime startTime) { + // Initial Evaluator Request + evaluatorRequestor.submit(EvaluatorRequest.newBuilder() + .setNumber(evalNum) + .setMemory(evalMem) + .setNumberOfCores(evalCores) + .build()); + + // Run Vortex Start + vortexStartEStage.onNext(vortexStart); + + // Run Scheduler + pendingTaskletSchedulerEStage.onNext(SCHEDULER_EVENT); + } + } + + /** + * Container allocated. + */ + final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { + @Override + public void onNext(final AllocatedEvaluator allocatedEvaluator) { + LOG.log(Level.INFO, "Container allocated"); + final String workerId = allocatedEvaluator.getId() + "_vortex_worker"; + + final Configuration workerConfiguration = VortexWorkerConf.CONF + .set(VortexWorkerConf.NUM_OF_THREADS, evalCores) // NUM_OF_THREADS = evalCores + .build(); + + final Configuration taskConfiguration = TaskConfiguration.CONF + .set(TaskConfiguration.IDENTIFIER, workerId) + .set(TaskConfiguration.TASK, VortexWorker.class) + .set(TaskConfiguration.ON_SEND_MESSAGE, VortexWorker.class) + .set(TaskConfiguration.ON_MESSAGE, VortexWorker.DriverMessageHandler.class) + .set(TaskConfiguration.ON_CLOSE, VortexWorker.TaskCloseHandler.class) + .build(); + + allocatedEvaluator.submitTask(Configurations.merge(workerConfiguration, taskConfiguration)); + } + } + + /** + * Evaluator up and running. + */ + final class RunningTaskHandler implements EventHandler<RunningTask> { + @Override + public void onNext(final RunningTask reefTask) { + LOG.log(Level.INFO, "Worker up and running"); + vortexMaster.workerAllocated(new VortexWorkerManager(vortexRequestor, reefTask)); + } + } + + /** + * Message received. + */ + final class TaskMessageHandler implements EventHandler<TaskMessage> { + @Override + public void onNext(final TaskMessage taskMessage) { + final String workerId = taskMessage.getId(); + final WorkerReport workerReport= (WorkerReport)SerializationUtils.deserialize(taskMessage.get()); + switch (workerReport.getType()) { + case TaskletResult: + final TaskletResultReport taskletResultReport = (TaskletResultReport)workerReport; + vortexMaster.taskletCompleted(workerId, taskletResultReport.getTaskletId(), taskletResultReport.getResult()); + break; + case TaskletFailure: + final TaskletFailureReport taskletFailureReport = (TaskletFailureReport)workerReport; + vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(), taskletFailureReport.getException()); + break; + default: + throw new RuntimeException("Unknown Report"); + } + } + } + + /** + * Evaluator preempted. + * TODO[REEF-501]: Distinguish different types of FailedEvaluator in Vortex. + */ + final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { + @Override + public void onNext(final FailedEvaluator failedEvaluator) { + LOG.log(Level.INFO, "Evaluator preempted"); + if (numberOfFailures.incrementAndGet() >= MAX_NUM_OF_FAILURES) { + throw new RuntimeException("Exceeded max number of failures"); + } else { + // We request a new evaluator to take the place of the preempted one + evaluatorRequestor.submit(EvaluatorRequest.newBuilder() + .setNumber(1) + .setMemory(evalMem) + .setNumberOfCores(evalCores) + .build()); + + vortexMaster.workerPreempted(failedEvaluator.getFailedTask().get().getId()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java new file mode 100644 index 0000000..b288824 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.client.DriverLauncher; +import org.apache.reef.client.LauncherStatus; +import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.vortex.api.VortexStart; + +/** + * Launches a Vortex Job. + */ +@Unstable +public final class VortexLauncher { + private VortexLauncher() { + } + + private static final int MAX_NUMBER_OF_EVALUATORS = 10; + + /** + * Launch a Vortex job using the local runtime. + */ + public static LauncherStatus launchLocal(final String jobName, + final Class<? extends VortexStart> vortexUserCode, + final int numOfWorkers, + final int workerMemory, + final int workerCores) { + final Configuration runtimeConf = LocalRuntimeConfiguration.CONF + .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS) + .build(); + final Configuration vortexConf = + VortexConfHelper.getVortexConf(jobName, vortexUserCode, numOfWorkers, workerMemory, workerCores); + return launch(runtimeConf, vortexConf); + } + + private static LauncherStatus launch(final Configuration runtimeConf, final Configuration vortexConf) { + try { + return DriverLauncher.getLauncher(runtimeConf).run(vortexConf); + } catch (InjectionException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java new file mode 100644 index 0000000..c9da30d --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.vortex.api.VortexFunction; +import org.apache.reef.vortex.api.VortexFuture; + +import java.io.Serializable; + +/** + * The heart of Vortex. + * Processes various tasklet related events/requests coming from different components of the system. + */ +@Unstable +@DriverSide +@DefaultImplementation(DefaultVortexMaster.class) +public interface VortexMaster { + /** + * Submit a new Tasklet to be run sometime in the future. + */ + <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput> + enqueueTasklet(final VortexFunction<TInput, TOutput> vortexFunction, final TInput input); + + /** + * Call this when a new worker is up and running. + */ + void workerAllocated(final VortexWorkerManager vortexWorkerManager); + + /** + * Call this when a worker is preempted. + */ + void workerPreempted(final String id); + + /** + * Call this when a Tasklet is completed. + */ + void taskletCompleted(final String workerId, final int taskletId, final Serializable result); + + /** + * Call this when a Tasklet errored. + */ + void taskletErrored(final String workerId, final int taskletId, final Exception exception); + + /** + * Release all resources and shut down. + */ + void terminate(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java new file mode 100644 index 0000000..41b7ab2 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; +import org.apache.reef.tang.formats.RequiredImpl; +import org.apache.reef.tang.formats.RequiredParameter; +import org.apache.reef.vortex.api.VortexStart; +import org.apache.reef.wake.EStage; +import org.apache.reef.wake.StageConfiguration; +import org.apache.reef.wake.impl.ThreadPoolStage; + +/** + * Vortex Master configuration. + */ +@Unstable +@DriverSide +public final class VortexMasterConf extends ConfigurationModuleBuilder { + /** + * Number of Workers. + */ + @NamedParameter(doc = "Number of Workers") + final class WorkerNum implements Name<Integer> { + } + + /** + * Worker Memory. + */ + @NamedParameter(doc = "Worker Memory") + final class WorkerMem implements Name<Integer> { + } + + /** + * Worker Cores. + */ + @NamedParameter(doc = "Worker Cores") + final class WorkerCores implements Name<Integer> { + } + + /** + * Number of Workers. + */ + public static final RequiredParameter<Integer> WORKER_NUM = new RequiredParameter<>(); + + /** + * Worker Memory. + */ + public static final RequiredParameter<Integer> WORKER_MEM = new RequiredParameter<>(); + + /** + * Worker Cores. + */ + public static final RequiredParameter<Integer> WORKER_CORES = new RequiredParameter<>(); + + /** + * Vortex Start. + */ + public static final RequiredImpl<VortexStart> VORTEX_START = new RequiredImpl<>(); + + /** + * Number of Vortex Start threads. + */ + public static final RequiredParameter<Integer> NUM_OF_VORTEX_START_THERAD = new RequiredParameter<>(); + + /** + * Vortex Master configuration. + */ + public static final ConfigurationModule CONF = new VortexMasterConf() + .bindNamedParameter(WorkerNum.class, WORKER_NUM) + .bindNamedParameter(WorkerMem.class, WORKER_MEM) + .bindNamedParameter(WorkerCores.class, WORKER_CORES) + .bindImplementation(VortexStart.class, VORTEX_START) + .bindNamedParameter(StageConfiguration.NumberOfThreads.class, NUM_OF_VORTEX_START_THERAD) + .bindNamedParameter(StageConfiguration.StageHandler.class, VortexStartExecutor.class) + .bindImplementation(EStage.class, ThreadPoolStage.class) + .build(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java new file mode 100644 index 0000000..2c02d6e --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import org.apache.commons.lang.SerializationUtils; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.vortex.common.VortexRequest; + +import javax.inject.Inject; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Takes the serialization work from the scheduler thread. + */ +@DriverSide +class VortexRequestor { + private final ExecutorService executorService = Executors.newCachedThreadPool(); + + @Inject + VortexRequestor() { + } + + void send(final RunningTask reefTask, final VortexRequest vortexRequest) { + executorService.execute(new Runnable() { + @Override + public void run() { + // Possible race condition with VortexWorkerManager#terminate is addressed by the global lock in VortexMaster + reefTask.send(SerializationUtils.serialize(vortexRequest)); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2c994ec9/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexStartExecutor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexStartExecutor.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexStartExecutor.java new file mode 100644 index 0000000..6a5ae4b --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexStartExecutor.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.vortex.api.VortexStart; +import org.apache.reef.vortex.api.VortexThreadPool; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; + +/** + * Thread pool that executes VortexStart. + */ +@DriverSide +final class VortexStartExecutor implements EventHandler<VortexStart> { + private final VortexThreadPool vortexThreadPool; // ThreadPool-like API for VortexStart to use + private final VortexMaster vortexMaster; + + @Inject + private VortexStartExecutor(final VortexThreadPool vortexThreadPool, + final VortexMaster vortexMaster) { + this.vortexThreadPool = vortexThreadPool; + this.vortexMaster = vortexMaster; + } + + /** + * Execute the passed VortexStart and terminate right after. + * @param vortexStart to execute + */ + @Override + public void onNext(final VortexStart vortexStart) { + vortexStart.start(vortexThreadPool); + vortexMaster.terminate(); + } +} \ No newline at end of file
