Repository: incubator-reef
Updated Branches:
refs/heads/master 7395bc681 -> 59489156c
[REEF-587] Create a default implementation for DriverRestartManager
This addressed the issue by
* Changing DriverRestartManager from an interface to a final class.
* Removing optional DriverRestartManager parameters.
* Adding a default DriverRuntimeRestartManager that fails on any
restart related options.
JIRA:
[REEF-587](https://issues.apache.org/jira/browse/REEF-587)
Pull Request:
This closes #371
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/59489156
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/59489156
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/59489156
Branch: refs/heads/master
Commit: 59489156c6d42f24997217cd33e9f047e568dd8c
Parents: 7395bc6
Author: Andrew Chung <[email protected]>
Authored: Thu Aug 13 16:48:43 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Aug 14 13:05:09 2015 -0700
----------------------------------------------------------------------
.../DefaultDriverRuntimeRestartMangerImpl.java | 70 ++++++++++
.../driver/restart/DriverRestartManager.java | 90 +++++++++++--
.../restart/DriverRestartManagerImpl.java | 132 -------------------
.../driver/restart/DriverRestartUtilities.java | 16 +--
.../restart/DriverRuntimeRestartManager.java | 4 +-
.../DriverRuntimeRestartConfiguration.java | 1 -
.../common/driver/DriverStartHandler.java | 57 ++------
.../driver/context/ContextRepresenters.java | 14 +-
.../driver/evaluator/EvaluatorManager.java | 66 +---------
.../common/driver/task/TaskRepresenter.java | 4 +-
10 files changed, 179 insertions(+), 275 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
new file mode 100644
index 0000000..312c183
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.exception.DriverFatalRuntimeException;
+
+import javax.inject.Inject;
+import java.util.Set;
+
+/**
+ * The default driver runtime restart manager that is not able to perform any
restart actions.
+ * Thus, when performing actions pertaining to restart, it is recommended to
call static functions in
+ * {@link DriverRestartUtilities} or call canRestart() first.
+ */
+@Private
+@DriverSide
+@Unstable
+final class DefaultDriverRuntimeRestartMangerImpl implements
DriverRuntimeRestartManager {
+ @Inject
+ private DefaultDriverRuntimeRestartMangerImpl() {
+ }
+
+ @Override
+ public boolean isRestart() {
+ return false;
+ }
+
+ @Override
+ public void recordAllocatedEvaluator(final String id) {
+ throw new DriverFatalRuntimeException(
+ "Restart is not enabled. recordAllocatedEvaluator should not have been
called.");
+ }
+
+ @Override
+ public void recordRemovedEvaluator(final String id) {
+ throw new DriverFatalRuntimeException(
+ "Restart is not enabled. recordRemovedEvaluator should not have been
called.");
+ }
+
+ @Override
+ public EvaluatorRestartInfo getAliveAndFailedEvaluators() {
+ throw new DriverFatalRuntimeException(
+ "Restart is not enabled. getAliveAndFailedEvaluators should not have
been called.");
+ }
+
+ @Override
+ public void informAboutEvaluatorFailures(final Set<String>
failedEvaluatorIds) {
+ throw new DriverFatalRuntimeException(
+ "Restart is not enabled. informAboutEvaluatorFailures should not have
been called.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 57c31c8..c517b04 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -21,8 +21,14 @@ package org.apache.reef.driver.restart;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.exception.DriverFatalRuntimeException;
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* The manager that handles aspects of driver restart such as determining
whether the driver is in
@@ -31,29 +37,61 @@ import java.util.Set;
@DriverSide
@Private
@Unstable
-public interface DriverRestartManager {
+public final class DriverRestartManager {
+ private static final Logger LOG =
Logger.getLogger(DriverRestartManager.class.getName());
+ private final DriverRuntimeRestartManager driverRuntimeRestartManager;
+ private final Set<String> previousEvaluators;
+ private final Set<String> recoveredEvaluators;
+ private DriverRestartState state;
+
+ @Inject
+ private DriverRestartManager(final DriverRuntimeRestartManager
driverRuntimeRestartManager) {
+ this.driverRuntimeRestartManager = driverRuntimeRestartManager;
+ this.state = DriverRestartState.NotRestarted;
+ this.previousEvaluators = new HashSet<>();
+ this.recoveredEvaluators = new HashSet<>();
+ }
/**
* @return Whether or not the driver instance is a restarted instance.
*/
- boolean isRestart();
+ public synchronized boolean isRestart() {
+ if (this.state.isRestart()) {
+ return true;
+ }
+
+ if (driverRuntimeRestartManager.isRestart()) {
+ this.state = DriverRestartState.RestartBegan;
+ return true;
+ }
+
+ return false;
+ }
/**
* Recovers the list of alive and failed evaluators and inform about
evaluator failures
* based on the specific runtime. Also sets the expected amount of
evaluators to report back
* as alive to the job driver.
*/
- void onRestart();
+ public synchronized void onRestart() {
+ final EvaluatorRestartInfo evaluatorRestartInfo =
driverRuntimeRestartManager.getAliveAndFailedEvaluators();
+ setPreviousEvaluatorIds(evaluatorRestartInfo.getAliveEvaluators());
+
driverRuntimeRestartManager.informAboutEvaluatorFailures(evaluatorRestartInfo.getFailedEvaluators());
+ }
/**
* @return whether restart is completed.
*/
- boolean isRestartCompleted();
+ public synchronized boolean isRestartCompleted() {
+ return this.state == DriverRestartState.RestartCompleted;
+ }
/**
* @return the Evaluators expected to check in from a previous run.
*/
- Set<String> getPreviousEvaluatorIds();
+ public synchronized Set<String> getPreviousEvaluatorIds() {
+ return Collections.unmodifiableSet(this.previousEvaluators);
+ }
/**
* Set the Evaluators to expect still active from a previous execution of
the Driver in a restart situation.
@@ -61,29 +99,61 @@ public interface DriverRestartManager {
*
* @param ids the evaluator IDs of the evaluators that are expected to have
survived driver restart.
*/
- void setPreviousEvaluatorIds(final Set<String> ids);
+ public synchronized void setPreviousEvaluatorIds(final Set<String> ids) {
+ if (this.state != DriverRestartState.RestartInProgress) {
+ previousEvaluators.addAll(ids);
+ this.state = DriverRestartState.RestartInProgress;
+ } else {
+ final String errMsg = "Should not be setting the set of expected alive
evaluators more than once.";
+ LOG.log(Level.SEVERE, errMsg);
+ throw new DriverFatalRuntimeException(errMsg);
+ }
+ }
/**
* @return the IDs of the Evaluators from a previous Driver that have
checked in with the Driver
* in a restart situation.
*/
- Set<String> getRecoveredEvaluatorIds();
+ public synchronized Set<String> getRecoveredEvaluatorIds() {
+ return Collections.unmodifiableSet(this.previousEvaluators);
+ }
/**
* Indicate that this Driver has re-established the connection with one more
Evaluator of a previous run.
* @return true if the driver restart is completed.
*/
- boolean evaluatorRecovered(final String id);
+ public synchronized boolean onRecoverEvaluatorIsRestartComplete(final String
evaluatorId) {
+ if (!this.previousEvaluators.contains(evaluatorId)) {
+ final String errMsg = "Evaluator with evaluator ID " + evaluatorId + "
not expected to be alive.";
+ LOG.log(Level.SEVERE, errMsg);
+ throw new DriverFatalRuntimeException(errMsg);
+ }
+
+ if (!this.recoveredEvaluators.add(evaluatorId)) {
+ LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + "
added to the set" +
+ " of recovered evaluators more than once. Ignoring second add...");
+ }
+
+ if (this.recoveredEvaluators.containsAll(this.previousEvaluators)) {
+ this.state = DriverRestartState.RestartCompleted;
+ }
+
+ return this.state == DriverRestartState.RestartCompleted;
+ }
/**
* Records the evaluators when it is allocated. The implementation depends
on the runtime.
* @param id The evaluator ID of the allocated evaluator.
*/
- void recordAllocatedEvaluator(final String id);
+ public synchronized void recordAllocatedEvaluator(final String id) {
+ driverRuntimeRestartManager.recordAllocatedEvaluator(id);
+ }
/**
* Records a removed evaluator into the evaluator log. The implementation
depends on the runtime.
* @param id The evaluator ID of the removed evaluator.
*/
- void recordRemovedEvaluator(final String id);
+ public synchronized void recordRemovedEvaluator(final String id) {
+ driverRuntimeRestartManager.recordRemovedEvaluator(id);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
deleted file mode 100644
index 4ea3b56..0000000
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.driver.restart;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.exception.DriverFatalRuntimeException;
-
-import javax.inject.Inject;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * The implementation of DriverRestartManager. A few methods here are proxy
methods for
- * the DriverRuntimeRestartManager that depends on the runtime implementation.
- */
-@DriverSide
-@Private
-@Unstable
-public final class DriverRestartManagerImpl implements DriverRestartManager {
- private static final Logger LOG =
Logger.getLogger(DriverRestartManagerImpl.class.getName());
- private final DriverRuntimeRestartManager driverRuntimeRestartManager;
- private final Set<String> previousEvaluators;
- private final Set<String> recoveredEvaluators;
- private DriverRestartState state;
-
- @Inject
- private DriverRestartManagerImpl(final DriverRuntimeRestartManager
driverRuntimeRestartManager) {
- this.driverRuntimeRestartManager = driverRuntimeRestartManager;
- this.state = DriverRestartState.NotRestarted;
- this.previousEvaluators = new HashSet<>();
- this.recoveredEvaluators = new HashSet<>();
- }
-
- @Override
- public synchronized boolean isRestart() {
- if (this.state.isRestart()) {
- return true;
- }
-
- if (driverRuntimeRestartManager.isRestart()) {
- this.state = DriverRestartState.RestartBegan;
- return true;
- }
-
- return false;
- }
-
- @Override
- public synchronized void onRestart() {
- final EvaluatorRestartInfo evaluatorRestartInfo =
driverRuntimeRestartManager.getAliveAndFailedEvaluators();
- setPreviousEvaluatorIds(evaluatorRestartInfo.getAliveEvaluators());
-
driverRuntimeRestartManager.informAboutEvaluatorFailures(evaluatorRestartInfo.getFailedEvaluators());
- }
-
- @Override
- public synchronized boolean isRestartCompleted() {
- return this.state == DriverRestartState.RestartCompleted;
- }
-
- @Override
- public synchronized Set<String> getPreviousEvaluatorIds() {
- return Collections.unmodifiableSet(this.previousEvaluators);
- }
-
- @Override
- public synchronized void setPreviousEvaluatorIds(final Set<String> ids) {
- if (this.state != DriverRestartState.RestartInProgress) {
- previousEvaluators.addAll(ids);
- this.state = DriverRestartState.RestartInProgress;
- } else {
- final String errMsg = "Should not be setting the set of expected alive
evaluators more than once.";
- LOG.log(Level.SEVERE, errMsg);
- throw new DriverFatalRuntimeException(errMsg);
- }
- }
-
- @Override
- public synchronized Set<String> getRecoveredEvaluatorIds() {
- return Collections.unmodifiableSet(this.previousEvaluators);
- }
-
- @Override
- public synchronized boolean evaluatorRecovered(final String evaluatorId) {
- if (!this.previousEvaluators.contains(evaluatorId)) {
- final String errMsg = "Evaluator with evaluator ID " + evaluatorId + "
not expected to be alive.";
- LOG.log(Level.SEVERE, errMsg);
- throw new DriverFatalRuntimeException(errMsg);
- }
-
- if (!this.recoveredEvaluators.add(evaluatorId)) {
- LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + "
added to the set" +
- " of recovered evaluators more than once. Ignoring second add...");
- }
-
- if (this.recoveredEvaluators.containsAll(this.previousEvaluators)) {
- this.state = DriverRestartState.RestartCompleted;
- }
-
- return this.state == DriverRestartState.RestartCompleted;
- }
-
- @Override
- public void recordAllocatedEvaluator(final String id) {
- driverRuntimeRestartManager.recordAllocatedEvaluator(id);
- }
-
- @Override
- public void recordRemovedEvaluator(final String id) {
- driverRuntimeRestartManager.recordRemovedEvaluator(id);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
index dcf753b..469a4d1 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java
@@ -21,10 +21,11 @@ package org.apache.reef.driver.restart;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.util.Optional;
/**
* A static utilities class for simplifying calls to driver restart manager.
+ * Functions here should always call driverRestartManager.canRestart() before
performing any
+ * actual options.
*/
@Private
@DriverSide
@@ -33,19 +34,6 @@ public final class DriverRestartUtilities {
/**
* Helper function for driver restart to determine whether an evaluator ID
is from an evaluator from the
- * previous application attempt. DriverRestartManager is optional here.
- */
- public static boolean isRestartAndIsPreviousEvaluator(final
Optional<DriverRestartManager> driverRestartManager,
- final String
evaluatorId) {
- if (!driverRestartManager.isPresent()) {
- return false;
- }
-
- return isRestartAndIsPreviousEvaluator(driverRestartManager.get(),
evaluatorId);
- }
-
- /**
- * Helper function for driver restart to determine whether an evaluator ID
is from an evaluator from the
* previous application attempt.
*/
public static boolean isRestartAndIsPreviousEvaluator(final
DriverRestartManager driverRestartManager,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
index c581e93..607a031 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
@@ -22,6 +22,7 @@ import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.tang.annotations.DefaultImplementation;
import java.util.Set;
@@ -34,9 +35,10 @@ import java.util.Set;
@Private
@RuntimeAuthor
@Unstable
+@DefaultImplementation(DefaultDriverRuntimeRestartMangerImpl.class)
public interface DriverRuntimeRestartManager {
/**
- * Determines whether or not the driver has been restarted.
+ * Determines whether or not the driver has been restarted. The default
implementation always returns false.
*/
boolean isRestart();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
index cbac9ea..0db7a54 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
@@ -43,7 +43,6 @@ public final class DriverRuntimeRestartConfiguration extends
ConfigurationModule
// Automatically sets preserve evaluators to true.
.bindNamedParameter(ResourceManagerPreserveEvaluators.class,
Boolean.toString(true))
- .bindImplementation(DriverRestartManager.class,
DriverRestartManagerImpl.class)
.bindSetEntry(ServiceEvaluatorAllocatedHandlers.class,
EvaluatorPreservingEvaluatorAllocatedHandler.class)
.bindSetEntry(ServiceEvaluatorFailedHandlers.class,
EvaluatorPreservingEvaluatorFailedHandler.class)
.bindSetEntry(ServiceEvaluatorCompletedHandlers.class,
EvaluatorPreservingEvaluatorCompletedHandler.class)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
index 2811497..62e53b7 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -23,7 +23,6 @@ import
org.apache.reef.driver.parameters.ServiceDriverRestartedHandlers;
import org.apache.reef.driver.restart.DriverRestartManager;
import org.apache.reef.exception.DriverFatalRuntimeException;
import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.util.Optional;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.time.event.StartTime;
@@ -39,48 +38,31 @@ public final class DriverStartHandler implements
EventHandler<StartTime> {
private static final Logger LOG =
Logger.getLogger(DriverStartHandler.class.getName());
private final Set<EventHandler<StartTime>> startHandlers;
- private final Optional<Set<EventHandler<StartTime>>> restartHandlers;
- private final Optional<Set<EventHandler<StartTime>>> serviceRestartHandlers;
- private final Optional<DriverRestartManager> driverRestartManager;
+ private final Set<EventHandler<StartTime>> restartHandlers;
+ private final Set<EventHandler<StartTime>> serviceRestartHandlers;
+ private final DriverRestartManager driverRestartManager;
@Inject
DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
- final Set<EventHandler<StartTime>> startHandler,
+ final Set<EventHandler<StartTime>> startHandlers,
@Parameter(DriverRestartHandler.class)
final Set<EventHandler<StartTime>> restartHandlers,
@Parameter(ServiceDriverRestartedHandlers.class)
final Set<EventHandler<StartTime>> serviceRestartHandlers,
final DriverRestartManager driverRestartManager) {
- this(startHandler, Optional.of(restartHandlers),
Optional.of(serviceRestartHandlers),
- Optional.of(driverRestartManager));
- LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers
[{0}], RestartHandlers [{1}]," +
- "and ServiceRestartHandlers [{2}], with a restart manager.",
- new String[] {this.startHandlers.toString(),
this.restartHandlers.toString(),
- this.serviceRestartHandlers.toString()});
- }
-
- @Inject
-
DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
- final Set<EventHandler<StartTime>> startHandlers) {
- this(startHandlers, Optional.<Set<EventHandler<StartTime>>>empty(),
- Optional.<Set<EventHandler<StartTime>>>empty(),
Optional.<DriverRestartManager>empty());
- LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers
[{0}] and no restart.",
- this.startHandlers.toString());
- }
-
- private DriverStartHandler(final Set<EventHandler<StartTime>> startHandler,
- final Optional<Set<EventHandler<StartTime>>>
restartHandlers,
- final Optional<Set<EventHandler<StartTime>>>
serviceRestartHandlers,
- final Optional<DriverRestartManager>
driverRestartManager) {
- this.startHandlers = startHandler;
+ this.startHandlers = startHandlers;
this.restartHandlers = restartHandlers;
this.serviceRestartHandlers = serviceRestartHandlers;
this.driverRestartManager = driverRestartManager;
+ LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers
[{0}], RestartHandlers [{1}]," +
+ "and ServiceRestartHandlers [{2}].",
+ new String[] {this.startHandlers.toString(),
this.restartHandlers.toString(),
+ this.serviceRestartHandlers.toString()});
}
@Override
public void onNext(final StartTime startTime) {
- if (isRestart()) {
+ if (this.driverRestartManager.isRestart()) {
this.onRestart(startTime);
} else {
this.onStart(startTime);
@@ -88,18 +70,18 @@ public final class DriverStartHandler implements
EventHandler<StartTime> {
}
private void onRestart(final StartTime startTime) {
- if (this.driverRestartManager.isPresent() &&
this.restartHandlers.isPresent()) {
- for (EventHandler<StartTime> serviceRestartHandler :
this.serviceRestartHandlers.get()) {
+ if (this.restartHandlers.size() > 0) {
+ for (EventHandler<StartTime> serviceRestartHandler :
this.serviceRestartHandlers) {
serviceRestartHandler.onNext(startTime);
}
- for (EventHandler<StartTime> restartHandler :
this.restartHandlers.get()){
+ for (EventHandler<StartTime> restartHandler : this.restartHandlers){
restartHandler.onNext(startTime);
}
// This can only be called after calling client restart handlers because
REEF.NET
// JobDriver requires making this call to set up the InterOp handlers.
- this.driverRestartManager.get().onRestart();
+ this.driverRestartManager.onRestart();
} else {
throw new DriverFatalRuntimeException("Driver restart happened, but no
ON_DRIVER_RESTART handler is bound.");
}
@@ -110,15 +92,4 @@ public final class DriverStartHandler implements
EventHandler<StartTime> {
startHandler.onNext(startTime);
}
}
-
- /**
- * @return true, if the configurations enable restart and the Driver is in
fact being restarted.
- */
- private boolean isRestart() {
- if (this.driverRestartManager.isPresent()) {
- return this.driverRestartManager.get().isRestart();
- }
-
- return false;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
index a8d538e..41bdd43 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
@@ -45,7 +45,7 @@ public final class ContextRepresenters {
private final EvaluatorMessageDispatcher messageDispatcher;
private final ContextFactory contextFactory;
- private final Optional<DriverRestartManager> driverRestartManager;
+ private final DriverRestartManager driverRestartManager;
// Mutable fields
@GuardedBy("this")
@@ -55,20 +55,8 @@ public final class ContextRepresenters {
@Inject
private ContextRepresenters(final EvaluatorMessageDispatcher
messageDispatcher,
- final ContextFactory contextFactory) {
- this(messageDispatcher, contextFactory,
Optional.<DriverRestartManager>empty());
- }
-
- @Inject
- private ContextRepresenters(final EvaluatorMessageDispatcher
messageDispatcher,
final ContextFactory contextFactory,
final DriverRestartManager driverRestartManager)
{
- this(messageDispatcher, contextFactory, Optional.of(driverRestartManager));
- }
-
- private ContextRepresenters(final EvaluatorMessageDispatcher
messageDispatcher,
- final ContextFactory contextFactory,
- final Optional<DriverRestartManager>
driverRestartManager) {
this.messageDispatcher = messageDispatcher;
this.contextFactory = contextFactory;
this.driverRestartManager = driverRestartManager;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index 032c0cb..12060dd 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -21,6 +21,7 @@ package org.apache.reef.runtime.common.driver.evaluator;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.evaluator.CLRProcessFactory;
+import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders;
import org.apache.reef.driver.restart.DriverRestartManager;
import org.apache.reef.driver.restart.DriverRestartUtilities;
import org.apache.reef.tang.ConfigurationProvider;
@@ -29,7 +30,6 @@ import org.apache.reef.driver.context.FailedContext;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
import org.apache.reef.driver.evaluator.JVMProcessFactory;
-import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders;
import org.apache.reef.driver.task.FailedTask;
import org.apache.reef.exception.EvaluatorException;
import org.apache.reef.exception.EvaluatorKilledByResourceManagerException;
@@ -104,7 +104,7 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
private final Set<ConfigurationProvider> evaluatorConfigurationProviders;
private final JVMProcessFactory jvmProcessFactory;
private final CLRProcessFactory clrProcessFactory;
- private final Optional<DriverRestartManager> driverRestartManager;
+ private final DriverRestartManager driverRestartManager;
// Mutable fields
private Optional<TaskRepresenter> task = Optional.empty();
@@ -130,63 +130,10 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
final LoggingScopeFactory loggingScopeFactory,
@Parameter(EvaluatorConfigurationProviders.class)
final Set<ConfigurationProvider> evaluatorConfigurationProviders,
- final JVMProcessFactory jvmProcessFactory,
- final CLRProcessFactory clrProcessFactory,
- final DriverRestartManager driverRestartManager) {
- this(clock, remoteManager, resourceReleaseHandler, resourceLaunchHandler,
evaluatorId, evaluatorDescriptor,
- contextRepresenters, configurationSerializer, messageDispatcher,
evaluatorControlHandler,
- contextControlHandler, stateManager, exceptionCodec, idlenessSource,
loggingScopeFactory,
- evaluatorConfigurationProviders, jvmProcessFactory, clrProcessFactory,
Optional.of(driverRestartManager));
- }
-
- @Inject
- private EvaluatorManager(
- final Clock clock,
- final RemoteManager remoteManager,
- final ResourceReleaseHandler resourceReleaseHandler,
- final ResourceLaunchHandler resourceLaunchHandler,
- @Parameter(EvaluatorIdentifier.class) final String evaluatorId,
- @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl
evaluatorDescriptor,
- final ContextRepresenters contextRepresenters,
- final ConfigurationSerializer configurationSerializer,
- final EvaluatorMessageDispatcher messageDispatcher,
- final EvaluatorControlHandler evaluatorControlHandler,
- final ContextControlHandler contextControlHandler,
- final EvaluatorStatusManager stateManager,
- final ExceptionCodec exceptionCodec,
- final EventHandlerIdlenessSource idlenessSource,
- final LoggingScopeFactory loggingScopeFactory,
- @Parameter(EvaluatorConfigurationProviders.class)
- final Set<ConfigurationProvider> evaluatorConfigurationProviders,
- final JVMProcessFactory jvmProcessFactory,
- final CLRProcessFactory clrProcessFactory) {
- this(clock, remoteManager, resourceReleaseHandler, resourceLaunchHandler,
evaluatorId, evaluatorDescriptor,
- contextRepresenters, configurationSerializer, messageDispatcher,
evaluatorControlHandler,
- contextControlHandler, stateManager, exceptionCodec, idlenessSource,
loggingScopeFactory,
- evaluatorConfigurationProviders, jvmProcessFactory, clrProcessFactory,
Optional.<DriverRestartManager>empty());
- }
-
- private EvaluatorManager(
- final Clock clock,
- final RemoteManager remoteManager,
- final ResourceReleaseHandler resourceReleaseHandler,
- final ResourceLaunchHandler resourceLaunchHandler,
- final String evaluatorId,
- final EvaluatorDescriptorImpl evaluatorDescriptor,
- final ContextRepresenters contextRepresenters,
- final ConfigurationSerializer configurationSerializer,
- final EvaluatorMessageDispatcher messageDispatcher,
- final EvaluatorControlHandler evaluatorControlHandler,
- final ContextControlHandler contextControlHandler,
- final EvaluatorStatusManager stateManager,
- final ExceptionCodec exceptionCodec,
- final EventHandlerIdlenessSource idlenessSource,
- final LoggingScopeFactory loggingScopeFactory,
- final Set<ConfigurationProvider> evaluatorConfigurationProviders,
// TODO: Eventually remove the factories when they are removed from
AllocatedEvaluatorImpl
final JVMProcessFactory jvmProcessFactory,
final CLRProcessFactory clrProcessFactory,
- final Optional<DriverRestartManager> driverRestartManager) {
+ final DriverRestartManager driverRestartManager) {
this.contextRepresenters = contextRepresenters;
this.idlenessSource = idlenessSource;
LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator:
{0}", evaluatorId);
@@ -396,7 +343,8 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
this.evaluatorControlHandler.setRemoteID(evaluatorRID);
this.stateManager.setRunning();
- boolean restartCompleted =
this.driverRestartManager.get().evaluatorRecovered(this.evaluatorId);
+ final boolean restartCompleted =
+
this.driverRestartManager.onRecoverEvaluatorIsRestartComplete(this.evaluatorId);
LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.",
this.evaluatorId);
@@ -405,8 +353,8 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
LOG.log(Level.INFO, "All expected evaluators checked in.");
} else {
LOG.log(Level.INFO, "Expecting [{0}], [{1}] have checked in.",
- new
Object[]{this.driverRestartManager.get().getPreviousEvaluatorIds(),
- this.driverRestartManager.get().getRecoveredEvaluatorIds()});
+ new Object[]{this.driverRestartManager.getPreviousEvaluatorIds(),
+ this.driverRestartManager.getRecoveredEvaluatorIds()});
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/59489156/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
index 75dd24e..5436494 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
@@ -49,7 +49,7 @@ public final class TaskRepresenter {
private final EvaluatorManager evaluatorManager;
private final ExceptionCodec exceptionCodec;
private final String taskId;
- private final Optional<DriverRestartManager> driverRestartManager;
+ private final DriverRestartManager driverRestartManager;
// Mutable state
private ReefServiceProtos.State state = ReefServiceProtos.State.INIT;
@@ -59,7 +59,7 @@ public final class TaskRepresenter {
final EvaluatorMessageDispatcher messageDispatcher,
final EvaluatorManager evaluatorManager,
final ExceptionCodec exceptionCodec,
- final Optional<DriverRestartManager>
driverRestartManager) {
+ final DriverRestartManager driverRestartManager) {
this.taskId = taskId;
this.context = context;
this.messageDispatcher = messageDispatcher;