Repository: incubator-reef
Updated Branches:
refs/heads/master fdda92bc7 -> 2cd1e4bf3
[REEF-687] Restructure handler for DriverRestart in Java
This addressed the issue by
* Created a `DriverRestarted` interface that provides the set of
Evaluator IDs for Evaluators that are expected to report back.
* Added a `ClrHandlersInitializer` interface that allows implementing
classes to initialize CLR EventHandlers based on Driver
condition.
* Change `DriverRestartHandlers` to implement
`EventHandler<DriverRestarted>`.
JIRA:
[REEF-687](https://issues.apache.org/jira/browse/REEF-687)
Pull Request:
This closes #438
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2cd1e4bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2cd1e4bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2cd1e4bf
Branch: refs/heads/master
Commit: 2cd1e4bf3b4e3af53eabea6a92cd9ddc0218cb61
Parents: fdda92b
Author: Andrew Chung <[email protected]>
Authored: Fri Aug 28 21:02:32 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Aug 31 13:21:33 2015 -0700
----------------------------------------------------------------------
.../generic/ClrHandlersInitializer.java | 38 ++++++++++++
.../DriverRestartClrHandlersInitializer.java | 48 +++++++++++++++
.../DriverStartClrHandlersInitializer.java | 46 +++++++++++++++
.../reef/javabridge/generic/JobDriver.java | 18 +++---
.../reef/client/DriverRestartConfiguration.java | 4 +-
.../reef/client/DriverServiceConfiguration.java | 3 +-
.../driver/parameters/DriverRestartHandler.java | 11 ++--
.../ServiceDriverRestartedHandlers.java | 4 +-
.../driver/restart/DriverRestartManager.java | 16 +++--
.../reef/driver/restart/DriverRestarted.java | 47 +++++++++++++++
.../driver/restart/DriverRestartedImpl.java | 62 ++++++++++++++++++++
.../common/driver/DriverStartHandler.java | 23 ++++----
.../reef/examples/hello/HelloDriverRestart.java | 8 +--
.../reef/webserver/ReefEventStateManager.java | 5 +-
14 files changed, 292 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java
new file mode 100644
index 0000000..34968f9
--- /dev/null
+++
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge.generic;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.javabridge.EvaluatorRequestorBridge;
+
+/**
+ * An initializer interface that initializes ClrHandlers for the CLR {@link
JobDriver}.
+ */
+@DriverSide
+@Private
+@Unstable
+interface ClrHandlersInitializer {
+
+ /**
+ * Returns the set of CLR handles.
+ */
+ long[] getClrHandlers(final String portNumber, final
EvaluatorRequestorBridge evaluatorRequestorBridge);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
new file mode 100644
index 0000000..d17c502
--- /dev/null
+++
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge.generic;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.restart.DriverRestarted;
+import org.apache.reef.javabridge.EvaluatorRequestorBridge;
+import org.apache.reef.javabridge.NativeInterop;
+
+/**
+ * An initializer implementation that initializes ClrHandlers for the CLR
{@link JobDriver} for the case
+ * where the Driver has restarted.
+ */
+@Private
+@DriverSide
+@Unstable
+final class DriverRestartClrHandlersInitializer implements
ClrHandlersInitializer {
+ private final DriverRestarted driverRestarted;
+
+ DriverRestartClrHandlersInitializer(final DriverRestarted driverRestarted) {
+ this.driverRestarted = driverRestarted;
+ }
+
+ @Override
+ public long[] getClrHandlers(final String portNumber, final
EvaluatorRequestorBridge evaluatorRequestorBridge) {
+ // TODO[REEF-689]: Make callClrSystemOnRestartedHandlerOnNext take
DriverRestarted object.
+ return NativeInterop.callClrSystemOnRestartHandlerOnNext(
+ driverRestarted.getStartTime().toString(), portNumber,
evaluatorRequestorBridge);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java
new file mode 100644
index 0000000..117bfbd
--- /dev/null
+++
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge.generic;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.javabridge.EvaluatorRequestorBridge;
+import org.apache.reef.javabridge.NativeInterop;
+import org.apache.reef.wake.time.event.StartTime;
+
+/**
+ * An initializer implementation that initializes ClrHandlers for the CLR
{@link JobDriver} for the case
+ * of regular Driver startup.
+ */
+@Private
+@DriverSide
+@Unstable
+final class DriverStartClrHandlersInitializer implements
ClrHandlersInitializer {
+ private final StartTime startTime;
+
+ DriverStartClrHandlersInitializer(final StartTime startTime) {
+ this.startTime = startTime;
+ }
+
+ @Override
+ public long[] getClrHandlers(final String portNumber, final
EvaluatorRequestorBridge evaluatorRequestorBridge) {
+ return NativeInterop.callClrSystemOnStartHandler(startTime.toString(),
portNumber, evaluatorRequestorBridge);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
index ea93ebc..212918e 100644
---
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
+++
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
@@ -24,6 +24,7 @@ import org.apache.reef.driver.context.ClosedContext;
import org.apache.reef.driver.context.ContextMessage;
import org.apache.reef.driver.context.FailedContext;
import org.apache.reef.driver.evaluator.*;
+import org.apache.reef.driver.restart.DriverRestarted;
import org.apache.reef.driver.task.*;
import org.apache.reef.io.network.naming.NameServer;
import org.apache.reef.javabridge.*;
@@ -175,7 +176,7 @@ public final class JobDriver {
this.clrProcessFactory = clrProcessFactory;
}
- private void setupBridge(final StartTime startTime) {
+ private void setupBridge(final ClrHandlersInitializer initializer) {
// Signal to the clr buffered log handler that the driver has started and
that
// we can begin logging
LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler...");
@@ -195,13 +196,10 @@ public final class JobDriver {
LOG.log(Level.INFO, "CLRBufferedLogHandler init complete.");
}
- LOG.log(Level.INFO, "StartTime: {0}", new Object[]{startTime});
final String portNumber = httpServer == null ? null :
Integer.toString((httpServer.getPort()));
final EvaluatorRequestorBridge evaluatorRequestorBridge =
new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor,
false, loggingScopeFactory);
- final long[] handlers = JobDriver.this.isRestarted ?
-
NativeInterop.callClrSystemOnRestartHandlerOnNext(startTime.toString(),
portNumber, evaluatorRequestorBridge)
- : NativeInterop.callClrSystemOnStartHandler(startTime.toString(),
portNumber, evaluatorRequestorBridge);
+ final long[] handlers = initializer.getClrHandlers(portNumber,
evaluatorRequestorBridge);
if (handlers != null) {
if (handlers.length != NativeInterop.N_HANDLERS) {
throw new RuntimeException(
@@ -579,7 +577,7 @@ public final class JobDriver {
try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime))
{
synchronized (JobDriver.this) {
- setupBridge(startTime);
+ setupBridge(new DriverStartClrHandlersInitializer(startTime));
LOG.log(Level.INFO, "Driver Started");
}
}
@@ -590,14 +588,14 @@ public final class JobDriver {
/**
* Job driver is restarted after previous crash.
*/
- public final class RestartHandler implements EventHandler<StartTime> {
+ public final class RestartHandler implements EventHandler<DriverRestarted> {
@Override
- public void onNext(final StartTime startTime) {
- try (final LoggingScope ls =
loggingScopeFactory.driverRestart(startTime)) {
+ public void onNext(final DriverRestarted driverRestarted) {
+ try (final LoggingScope ls =
loggingScopeFactory.driverRestart(driverRestarted.getStartTime())) {
synchronized (JobDriver.this) {
JobDriver.this.isRestarted = true;
- setupBridge(startTime);
+ setupBridge(new
DriverRestartClrHandlersInitializer(driverRestarted));
LOG.log(Level.INFO, "Driver Restarted and CLR bridge set up.");
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
index 185440e..e0e884f 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
@@ -24,6 +24,7 @@ import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Public;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.restart.DriverRestarted;
import org.apache.reef.driver.task.RunningTask;
import org.apache.reef.runtime.common.DriverRestartCompleted;
import org.apache.reef.tang.formats.ConfigurationModule;
@@ -31,7 +32,6 @@ import
org.apache.reef.tang.formats.ConfigurationModuleBuilder;
import org.apache.reef.tang.formats.OptionalImpl;
import org.apache.reef.tang.formats.OptionalParameter;
import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.event.StartTime;
/**
* EventHandlers specific to Driver Restart. Please remember to bind a
runtime-specific DriverRestartConfiguration,
@@ -45,7 +45,7 @@ public final class DriverRestartConfiguration extends
ConfigurationModuleBuilder
/**
* This event is fired in place of the ON_DRIVER_STARTED when the Driver is
in fact restarted after failure.
*/
- public static final OptionalImpl<EventHandler<StartTime>>
ON_DRIVER_RESTARTED = new OptionalImpl<>();
+ public static final OptionalImpl<EventHandler<DriverRestarted>>
ON_DRIVER_RESTARTED = new OptionalImpl<>();
/**
* Event handler for running tasks in previous evaluator, when driver
restarted. Defaults to crash if not bound.
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
index 4fe0452..8ac1519 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
@@ -29,6 +29,7 @@ import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.CompletedEvaluator;
import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.restart.DriverRestarted;
import org.apache.reef.driver.task.*;
import org.apache.reef.tang.formats.*;
import org.apache.reef.wake.EventHandler;
@@ -77,7 +78,7 @@ public final class DriverServiceConfiguration extends
ConfigurationModuleBuilder
/**
* The event handler invoked right after the driver restarts.
*/
- public static final OptionalImpl<EventHandler<StartTime>>
ON_DRIVER_RESTARTED = new OptionalImpl<>();
+ public static final OptionalImpl<EventHandler<DriverRestarted>>
ON_DRIVER_RESTARTED = new OptionalImpl<>();
/**
* The event handler invoked right before the driver shuts down. Defaults to
ignore.
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartHandler.java
index 11b8bb3..18f831d 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartHandler.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartHandler.java
@@ -18,17 +18,18 @@
*/
package org.apache.reef.driver.parameters;
+import org.apache.reef.driver.restart.DriverRestarted;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;
import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.event.StartTime;
import java.util.Set;
/**
- * The StartTime event is routed to this EventHandler if there is a restart,
instead of to DriverStartHandler.
+ * The EventHandler invoked on Driver restart. Provides the set of Evaluator
IDs of Evaluators that are expected to
+ * report back to the Driver on restart as well as the time of restart.
*/
-@NamedParameter(doc = "The StartTime event is routed to this EventHandler if
there is a restart, " +
- "instead of to DriverStartHandler.")
-public final class DriverRestartHandler implements
Name<Set<EventHandler<StartTime>>> {
+@NamedParameter(doc = "The EventHandler invoked on Driver restart. Provides
the set of Evaluator IDs of " +
+ "Evaluators that are expected to report back to the Driver on restart as
well as the time of restart.")
+public final class DriverRestartHandler implements
Name<Set<EventHandler<DriverRestarted>>> {
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartedHandlers.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartedHandlers.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartedHandlers.java
index 41d7f95..e0bc52c 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartedHandlers.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartedHandlers.java
@@ -18,10 +18,10 @@
*/
package org.apache.reef.driver.parameters;
+import org.apache.reef.driver.restart.DriverRestarted;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;
import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.event.StartTime;
import java.util.Set;
@@ -29,7 +29,7 @@ import java.util.Set;
* Service Handler for driver restarts.
*/
@NamedParameter(doc = "Service Handler for driver restarts.")
-public final class ServiceDriverRestartedHandlers implements
Name<Set<EventHandler<StartTime>>> {
+public final class ServiceDriverRestartedHandlers implements
Name<Set<EventHandler<DriverRestarted>>> {
private ServiceDriverRestartedHandlers(){
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 0557a2f..3ff1c8f 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -31,6 +31,7 @@ import org.apache.reef.runtime.common.driver.idle.IdleMessage;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EventHandler;
import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEvent;
+import org.apache.reef.wake.time.event.StartTime;
import javax.inject.Inject;
import java.util.*;
@@ -97,13 +98,20 @@ public final class DriverRestartManager implements
DriverIdlenessSource {
}
/**
- * Recovers the list of alive and failed evaluators and inform about
evaluator failures
- * based on the specific runtime. Also sets the expected amount of
evaluators to report back
- * as alive to the job driver.
+ * Recovers the list of alive and failed evaluators and inform the driver
restart handlers and inform the
+ * evaluator failure handlers based on the specific runtime. Also sets the
expected amount of evaluators to report
+ * back as alive to the job driver.
*/
- public synchronized void onRestart() {
+ public synchronized void onRestart(final StartTime startTime,
+ final List<EventHandler<DriverRestarted>>
orderedHandlers) {
if (this.state == DriverRestartState.BEGAN) {
restartEvaluators = driverRuntimeRestartManager.getPreviousEvaluators();
+ final DriverRestarted restartedInfo = new DriverRestartedImpl(startTime,
restartEvaluators);
+
+ for (final EventHandler<DriverRestarted> handler : orderedHandlers) {
+ handler.onNext(restartedInfo);
+ }
+
this.state = DriverRestartState.IN_PROGRESS;
} else {
final String errMsg = "Should not be setting the set of expected alive
evaluators more than once.";
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java
new file mode 100644
index 0000000..53f9149
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.wake.time.event.StartTime;
+
+import java.util.Set;
+
+/**
+ * Am event encapsulating the time of Driver restart as well as
+ * the set of Evaluator IDs of Evaluators that are expected to
+ * report back to the Driver after restart.
+ */
+@Public
+@Provided
+@Unstable
+public interface DriverRestarted {
+ /**
+ * @return The time of restart.
+ */
+ StartTime getStartTime();
+
+ /**
+ * @return The set of Evaluator IDs of Evaluators that are expected
+ * to report back to the Driver after restart.
+ */
+ Set<String> getExpectedEvaluatorIds();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java
new file mode 100644
index 0000000..db8aaa8
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.wake.time.event.StartTime;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * @see DriverRestarted
+ */
+@DriverSide
+@Private
+@Unstable
+public final class DriverRestartedImpl implements DriverRestarted {
+ private final StartTime startTime;
+ private final Set<String> expectedEvaluatorIds;
+
+ DriverRestartedImpl(final StartTime startTime, final RestartEvaluators
restartEvaluators) {
+ this.startTime = startTime;
+ Set<String> expected = new HashSet<>();
+
+ for (final String evaluatorId : restartEvaluators.getEvaluatorIds()) {
+ if (restartEvaluators.get(evaluatorId).getEvaluatorRestartState() ==
EvaluatorRestartState.EXPECTED) {
+ expected.add(evaluatorId);
+ }
+ }
+
+ this.expectedEvaluatorIds = Collections.unmodifiableSet(expected);
+ }
+
+ @Override
+ public StartTime getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public Set<String> getExpectedEvaluatorIds() {
+ return expectedEvaluatorIds;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
index 61845c3..abdecd7 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -21,12 +21,15 @@ package org.apache.reef.runtime.common.driver;
import org.apache.reef.driver.parameters.DriverRestartHandler;
import org.apache.reef.driver.parameters.ServiceDriverRestartedHandlers;
import org.apache.reef.driver.restart.DriverRestartManager;
+import org.apache.reef.driver.restart.DriverRestarted;
import org.apache.reef.exception.DriverFatalRuntimeException;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.time.event.StartTime;
import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -38,17 +41,17 @@ public final class DriverStartHandler implements
EventHandler<StartTime> {
private static final Logger LOG =
Logger.getLogger(DriverStartHandler.class.getName());
private final Set<EventHandler<StartTime>> startHandlers;
- private final Set<EventHandler<StartTime>> restartHandlers;
- private final Set<EventHandler<StartTime>> serviceRestartHandlers;
+ private final Set<EventHandler<DriverRestarted>> restartHandlers;
+ private final Set<EventHandler<DriverRestarted>> serviceRestartHandlers;
private final DriverRestartManager driverRestartManager;
@Inject
DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
final Set<EventHandler<StartTime>> startHandlers,
@Parameter(DriverRestartHandler.class)
- final Set<EventHandler<StartTime>> restartHandlers,
+ final Set<EventHandler<DriverRestarted>> restartHandlers,
@Parameter(ServiceDriverRestartedHandlers.class)
- final Set<EventHandler<StartTime>> serviceRestartHandlers,
+ final Set<EventHandler<DriverRestarted>>
serviceRestartHandlers,
final DriverRestartManager driverRestartManager) {
this.startHandlers = startHandlers;
this.restartHandlers = restartHandlers;
@@ -71,17 +74,15 @@ public final class DriverStartHandler implements
EventHandler<StartTime> {
private void onRestart(final StartTime startTime) {
if (this.restartHandlers.size() > 0) {
- for (EventHandler<StartTime> serviceRestartHandler :
this.serviceRestartHandlers) {
- serviceRestartHandler.onNext(startTime);
- }
+ final List<EventHandler<DriverRestarted>> orderedRestartHandlers =
+ new ArrayList<>(this.serviceRestartHandlers.size() +
this.restartHandlers.size());
- for (EventHandler<StartTime> restartHandler : this.restartHandlers){
- restartHandler.onNext(startTime);
- }
+ orderedRestartHandlers.addAll(this.serviceRestartHandlers);
+ orderedRestartHandlers.addAll(this.restartHandlers);
// This can only be called after calling client restart handlers because
REEF.NET
// JobDriver requires making this call to set up the InterOp handlers.
- this.driverRestartManager.onRestart();
+ this.driverRestartManager.onRestart(startTime, orderedRestartHandlers);
} else {
throw new DriverFatalRuntimeException("Driver restart happened, but no
ON_DRIVER_RESTART handler is bound.");
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriverRestart.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriverRestart.java
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriverRestart.java
index 9ef90bb..2a33c8c 100644
---
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriverRestart.java
+++
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriverRestart.java
@@ -18,9 +18,9 @@
*/
package org.apache.reef.examples.hello;
+import org.apache.reef.driver.restart.DriverRestarted;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.event.StartTime;
import javax.inject.Inject;
import java.util.logging.Level;
@@ -41,10 +41,10 @@ public final class HelloDriverRestart {
/**
* Handles Restarts. Prints a message.
*/
- public final class DriverRestartHandler implements EventHandler<StartTime> {
+ public final class DriverRestartHandler implements
EventHandler<DriverRestarted> {
@Override
- public void onNext(final StartTime value) {
- LOG.log(Level.INFO, "Hello, driver restarted at " + value);
+ public void onNext(final DriverRestarted value) {
+ LOG.log(Level.INFO, "Hello, driver restarted at " +
value.getStartTime());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
index 0f2aa73..29c9457 100644
---
a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
+++
b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
@@ -22,6 +22,7 @@ import org.apache.reef.driver.catalog.NodeDescriptor;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.restart.DriverRestarted;
import org.apache.reef.driver.task.RunningTask;
import org.apache.reef.runtime.common.driver.DriverStatusManager;
import org.apache.reef.runtime.common.utils.RemoteManager;
@@ -229,10 +230,10 @@ public final class ReefEventStateManager {
/**
* Job Driver has been restarted.
*/
- public final class DriverRestartHandler implements EventHandler<StartTime> {
+ public final class DriverRestartHandler implements
EventHandler<DriverRestarted> {
@Override
@SuppressWarnings("checkstyle:hiddenfield")
- public void onNext(final StartTime restartTime) {
+ public void onNext(final DriverRestarted restartTime) {
LOG.log(Level.INFO, "DriverRestartHandler called. StartTime: {0}",
restartTime);
}
}