Repository: incubator-reef
Updated Branches:
  refs/heads/master fdda92bc7 -> 2cd1e4bf3


[REEF-687] Restructure handler for DriverRestart in Java

This addressed the issue by
  * Created a `DriverRestarted` interface that provides the set of
    Evaluator IDs for Evaluators that are expected to report back.
  * Added a `ClrHandlersInitializer` interface that allows implementing
    classes to initialize CLR EventHandlers based on Driver
    condition.
  * Change `DriverRestartHandlers` to implement
    `EventHandler<DriverRestarted>`.

JIRA:
  [REEF-687](https://issues.apache.org/jira/browse/REEF-687)

Pull Request:
  This closes #438


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2cd1e4bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2cd1e4bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2cd1e4bf

Branch: refs/heads/master
Commit: 2cd1e4bf3b4e3af53eabea6a92cd9ddc0218cb61
Parents: fdda92b
Author: Andrew Chung <[email protected]>
Authored: Fri Aug 28 21:02:32 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Aug 31 13:21:33 2015 -0700

----------------------------------------------------------------------
 .../generic/ClrHandlersInitializer.java         | 38 ++++++++++++
 .../DriverRestartClrHandlersInitializer.java    | 48 +++++++++++++++
 .../DriverStartClrHandlersInitializer.java      | 46 +++++++++++++++
 .../reef/javabridge/generic/JobDriver.java      | 18 +++---
 .../reef/client/DriverRestartConfiguration.java |  4 +-
 .../reef/client/DriverServiceConfiguration.java |  3 +-
 .../driver/parameters/DriverRestartHandler.java | 11 ++--
 .../ServiceDriverRestartedHandlers.java         |  4 +-
 .../driver/restart/DriverRestartManager.java    | 16 +++--
 .../reef/driver/restart/DriverRestarted.java    | 47 +++++++++++++++
 .../driver/restart/DriverRestartedImpl.java     | 62 ++++++++++++++++++++
 .../common/driver/DriverStartHandler.java       | 23 ++++----
 .../reef/examples/hello/HelloDriverRestart.java |  8 +--
 .../reef/webserver/ReefEventStateManager.java   |  5 +-
 14 files changed, 292 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java
new file mode 100644
index 0000000..34968f9
--- /dev/null
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge.generic;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.javabridge.EvaluatorRequestorBridge;
+
+/**
+ * An initializer interface that initializes ClrHandlers for the CLR {@link 
JobDriver}.
+ */
+@DriverSide
+@Private
+@Unstable
+interface ClrHandlersInitializer {
+
+  /**
+   * Returns the set of CLR handles.
+   */
+  long[] getClrHandlers(final String portNumber, final 
EvaluatorRequestorBridge evaluatorRequestorBridge);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
new file mode 100644
index 0000000..d17c502
--- /dev/null
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge.generic;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.restart.DriverRestarted;
+import org.apache.reef.javabridge.EvaluatorRequestorBridge;
+import org.apache.reef.javabridge.NativeInterop;
+
+/**
+ * An initializer implementation that initializes ClrHandlers for the CLR 
{@link JobDriver} for the case
+ * where the Driver has restarted.
+ */
+@Private
+@DriverSide
+@Unstable
+final class DriverRestartClrHandlersInitializer implements 
ClrHandlersInitializer {
+  private final DriverRestarted driverRestarted;
+
+  DriverRestartClrHandlersInitializer(final DriverRestarted driverRestarted) {
+    this.driverRestarted = driverRestarted;
+  }
+
+  @Override
+  public long[] getClrHandlers(final String portNumber, final 
EvaluatorRequestorBridge evaluatorRequestorBridge) {
+    // TODO[REEF-689]: Make callClrSystemOnRestartedHandlerOnNext take 
DriverRestarted object.
+    return NativeInterop.callClrSystemOnRestartHandlerOnNext(
+        driverRestarted.getStartTime().toString(), portNumber, 
evaluatorRequestorBridge);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java
new file mode 100644
index 0000000..117bfbd
--- /dev/null
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge.generic;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.javabridge.EvaluatorRequestorBridge;
+import org.apache.reef.javabridge.NativeInterop;
+import org.apache.reef.wake.time.event.StartTime;
+
+/**
+ * An initializer implementation that initializes ClrHandlers for the CLR 
{@link JobDriver} for the case
+ * of regular Driver startup.
+ */
+@Private
+@DriverSide
+@Unstable
+final class DriverStartClrHandlersInitializer implements 
ClrHandlersInitializer {
+  private final StartTime startTime;
+
+  DriverStartClrHandlersInitializer(final StartTime startTime) {
+    this.startTime = startTime;
+  }
+
+  @Override
+  public long[] getClrHandlers(final String portNumber, final 
EvaluatorRequestorBridge evaluatorRequestorBridge) {
+    return NativeInterop.callClrSystemOnStartHandler(startTime.toString(), 
portNumber, evaluatorRequestorBridge);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
index ea93ebc..212918e 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
@@ -24,6 +24,7 @@ import org.apache.reef.driver.context.ClosedContext;
 import org.apache.reef.driver.context.ContextMessage;
 import org.apache.reef.driver.context.FailedContext;
 import org.apache.reef.driver.evaluator.*;
+import org.apache.reef.driver.restart.DriverRestarted;
 import org.apache.reef.driver.task.*;
 import org.apache.reef.io.network.naming.NameServer;
 import org.apache.reef.javabridge.*;
@@ -175,7 +176,7 @@ public final class JobDriver {
     this.clrProcessFactory = clrProcessFactory;
   }
 
-  private void setupBridge(final StartTime startTime) {
+  private void setupBridge(final ClrHandlersInitializer initializer) {
     // Signal to the clr buffered log handler that the driver has started and 
that
     // we can begin logging
     LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler...");
@@ -195,13 +196,10 @@ public final class JobDriver {
         LOG.log(Level.INFO, "CLRBufferedLogHandler init complete.");
       }
 
-      LOG.log(Level.INFO, "StartTime: {0}", new Object[]{startTime});
       final String portNumber = httpServer == null ? null : 
Integer.toString((httpServer.getPort()));
       final EvaluatorRequestorBridge evaluatorRequestorBridge =
           new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, 
false, loggingScopeFactory);
-      final long[] handlers = JobDriver.this.isRestarted ?
-          
NativeInterop.callClrSystemOnRestartHandlerOnNext(startTime.toString(), 
portNumber, evaluatorRequestorBridge)
-          : NativeInterop.callClrSystemOnStartHandler(startTime.toString(), 
portNumber, evaluatorRequestorBridge);
+      final long[] handlers = initializer.getClrHandlers(portNumber, 
evaluatorRequestorBridge);
       if (handlers != null) {
         if (handlers.length != NativeInterop.N_HANDLERS) {
           throw new RuntimeException(
@@ -579,7 +577,7 @@ public final class JobDriver {
       try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime)) 
{
         synchronized (JobDriver.this) {
 
-          setupBridge(startTime);
+          setupBridge(new DriverStartClrHandlersInitializer(startTime));
           LOG.log(Level.INFO, "Driver Started");
         }
       }
@@ -590,14 +588,14 @@ public final class JobDriver {
   /**
    * Job driver is restarted after previous crash.
    */
-  public final class RestartHandler implements EventHandler<StartTime> {
+  public final class RestartHandler implements EventHandler<DriverRestarted> {
     @Override
-    public void onNext(final StartTime startTime) {
-      try (final LoggingScope ls = 
loggingScopeFactory.driverRestart(startTime)) {
+    public void onNext(final DriverRestarted driverRestarted) {
+      try (final LoggingScope ls = 
loggingScopeFactory.driverRestart(driverRestarted.getStartTime())) {
         synchronized (JobDriver.this) {
 
           JobDriver.this.isRestarted = true;
-          setupBridge(startTime);
+          setupBridge(new 
DriverRestartClrHandlersInitializer(driverRestarted));
 
           LOG.log(Level.INFO, "Driver Restarted and CLR bridge set up.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
index 185440e..e0e884f 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
@@ -24,6 +24,7 @@ import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Public;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.restart.DriverRestarted;
 import org.apache.reef.driver.task.RunningTask;
 import org.apache.reef.runtime.common.DriverRestartCompleted;
 import org.apache.reef.tang.formats.ConfigurationModule;
@@ -31,7 +32,6 @@ import 
org.apache.reef.tang.formats.ConfigurationModuleBuilder;
 import org.apache.reef.tang.formats.OptionalImpl;
 import org.apache.reef.tang.formats.OptionalParameter;
 import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.event.StartTime;
 
 /**
  * EventHandlers specific to Driver Restart. Please remember to bind a 
runtime-specific DriverRestartConfiguration,
@@ -45,7 +45,7 @@ public final class DriverRestartConfiguration extends 
ConfigurationModuleBuilder
   /**
    * This event is fired in place of the ON_DRIVER_STARTED when the Driver is 
in fact restarted after failure.
    */
-  public static final OptionalImpl<EventHandler<StartTime>> 
ON_DRIVER_RESTARTED = new OptionalImpl<>();
+  public static final OptionalImpl<EventHandler<DriverRestarted>> 
ON_DRIVER_RESTARTED = new OptionalImpl<>();
 
   /**
    * Event handler for running tasks in previous evaluator, when driver 
restarted. Defaults to crash if not bound.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
index 4fe0452..8ac1519 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
@@ -29,6 +29,7 @@ import org.apache.reef.driver.evaluator.AllocatedEvaluator;
 import org.apache.reef.driver.evaluator.CompletedEvaluator;
 import org.apache.reef.driver.evaluator.FailedEvaluator;
 import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.restart.DriverRestarted;
 import org.apache.reef.driver.task.*;
 import org.apache.reef.tang.formats.*;
 import org.apache.reef.wake.EventHandler;
@@ -77,7 +78,7 @@ public final class DriverServiceConfiguration extends 
ConfigurationModuleBuilder
   /**
    * The event handler invoked right after the driver restarts.
    */
-  public static final OptionalImpl<EventHandler<StartTime>> 
ON_DRIVER_RESTARTED = new OptionalImpl<>();
+  public static final OptionalImpl<EventHandler<DriverRestarted>> 
ON_DRIVER_RESTARTED = new OptionalImpl<>();
 
   /**
    * The event handler invoked right before the driver shuts down. Defaults to 
ignore.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartHandler.java
index 11b8bb3..18f831d 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartHandler.java
@@ -18,17 +18,18 @@
  */
 package org.apache.reef.driver.parameters;
 
+import org.apache.reef.driver.restart.DriverRestarted;
 import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;
 import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.event.StartTime;
 
 import java.util.Set;
 
 /**
- * The StartTime event is routed to this EventHandler if there is a restart, 
instead of to DriverStartHandler.
+ * The EventHandler invoked on Driver restart. Provides the set of Evaluator 
IDs of Evaluators that are expected to
+ * report back to the Driver on restart as well as the time of restart.
  */
-@NamedParameter(doc = "The StartTime event is routed to this EventHandler if 
there is a restart, " +
-    "instead of to DriverStartHandler.")
-public final class DriverRestartHandler implements 
Name<Set<EventHandler<StartTime>>> {
+@NamedParameter(doc = "The EventHandler invoked on Driver restart. Provides 
the set of Evaluator IDs of " +
+    "Evaluators that are expected to report back to the Driver on restart as 
well as the time of restart.")
+public final class DriverRestartHandler implements 
Name<Set<EventHandler<DriverRestarted>>> {
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartedHandlers.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartedHandlers.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartedHandlers.java
index 41d7f95..e0bc52c 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartedHandlers.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartedHandlers.java
@@ -18,10 +18,10 @@
  */
 package org.apache.reef.driver.parameters;
 
+import org.apache.reef.driver.restart.DriverRestarted;
 import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;
 import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.event.StartTime;
 
 import java.util.Set;
 
@@ -29,7 +29,7 @@ import java.util.Set;
  * Service Handler for driver restarts.
  */
 @NamedParameter(doc = "Service Handler for driver restarts.")
-public final class ServiceDriverRestartedHandlers implements 
Name<Set<EventHandler<StartTime>>> {
+public final class ServiceDriverRestartedHandlers implements 
Name<Set<EventHandler<DriverRestarted>>> {
   private ServiceDriverRestartedHandlers(){
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 0557a2f..3ff1c8f 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -31,6 +31,7 @@ import org.apache.reef.runtime.common.driver.idle.IdleMessage;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.wake.EventHandler;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEvent;
+import org.apache.reef.wake.time.event.StartTime;
 
 import javax.inject.Inject;
 import java.util.*;
@@ -97,13 +98,20 @@ public final class DriverRestartManager implements 
DriverIdlenessSource {
   }
 
   /**
-   * Recovers the list of alive and failed evaluators and inform about 
evaluator failures
-   * based on the specific runtime. Also sets the expected amount of 
evaluators to report back
-   * as alive to the job driver.
+   * Recovers the list of alive and failed evaluators and inform the driver 
restart handlers and inform the
+   * evaluator failure handlers based on the specific runtime. Also sets the 
expected amount of evaluators to report
+   * back as alive to the job driver.
    */
-  public synchronized void onRestart() {
+  public synchronized void onRestart(final StartTime startTime,
+                                     final List<EventHandler<DriverRestarted>> 
orderedHandlers) {
     if (this.state == DriverRestartState.BEGAN) {
       restartEvaluators = driverRuntimeRestartManager.getPreviousEvaluators();
+      final DriverRestarted restartedInfo = new DriverRestartedImpl(startTime, 
restartEvaluators);
+
+      for (final EventHandler<DriverRestarted> handler : orderedHandlers) {
+        handler.onNext(restartedInfo);
+      }
+
       this.state = DriverRestartState.IN_PROGRESS;
     } else {
       final String errMsg = "Should not be setting the set of expected alive 
evaluators more than once.";

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java
new file mode 100644
index 0000000..53f9149
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.wake.time.event.StartTime;
+
+import java.util.Set;
+
+/**
+ * Am event encapsulating the time of Driver restart as well as
+ * the set of Evaluator IDs of Evaluators that are expected to
+ * report back to the Driver after restart.
+ */
+@Public
+@Provided
+@Unstable
+public interface DriverRestarted {
+  /**
+   * @return The time of restart.
+   */
+  StartTime getStartTime();
+
+  /**
+   * @return The set of Evaluator IDs of Evaluators that are expected
+   * to report back to the Driver after restart.
+   */
+  Set<String> getExpectedEvaluatorIds();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java
new file mode 100644
index 0000000..db8aaa8
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.wake.time.event.StartTime;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * @see DriverRestarted
+ */
+@DriverSide
+@Private
+@Unstable
+public final class DriverRestartedImpl implements DriverRestarted {
+  private final StartTime startTime;
+  private final Set<String> expectedEvaluatorIds;
+
+  DriverRestartedImpl(final StartTime startTime, final RestartEvaluators 
restartEvaluators) {
+    this.startTime = startTime;
+    Set<String> expected = new HashSet<>();
+
+    for (final String evaluatorId : restartEvaluators.getEvaluatorIds()) {
+      if (restartEvaluators.get(evaluatorId).getEvaluatorRestartState() == 
EvaluatorRestartState.EXPECTED) {
+        expected.add(evaluatorId);
+      }
+    }
+
+    this.expectedEvaluatorIds = Collections.unmodifiableSet(expected);
+  }
+
+  @Override
+  public StartTime getStartTime() {
+    return startTime;
+  }
+
+  @Override
+  public Set<String> getExpectedEvaluatorIds() {
+    return expectedEvaluatorIds;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
index 61845c3..abdecd7 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -21,12 +21,15 @@ package org.apache.reef.runtime.common.driver;
 import org.apache.reef.driver.parameters.DriverRestartHandler;
 import org.apache.reef.driver.parameters.ServiceDriverRestartedHandlers;
 import org.apache.reef.driver.restart.DriverRestartManager;
+import org.apache.reef.driver.restart.DriverRestarted;
 import org.apache.reef.exception.DriverFatalRuntimeException;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.time.event.StartTime;
 
 import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -38,17 +41,17 @@ public final class DriverStartHandler implements 
EventHandler<StartTime> {
   private static final Logger LOG = 
Logger.getLogger(DriverStartHandler.class.getName());
 
   private final Set<EventHandler<StartTime>> startHandlers;
-  private final Set<EventHandler<StartTime>> restartHandlers;
-  private final Set<EventHandler<StartTime>> serviceRestartHandlers;
+  private final Set<EventHandler<DriverRestarted>> restartHandlers;
+  private final Set<EventHandler<DriverRestarted>> serviceRestartHandlers;
   private final DriverRestartManager driverRestartManager;
 
   @Inject
   
DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
                      final Set<EventHandler<StartTime>> startHandlers,
                      @Parameter(DriverRestartHandler.class)
-                     final Set<EventHandler<StartTime>> restartHandlers,
+                     final Set<EventHandler<DriverRestarted>> restartHandlers,
                      @Parameter(ServiceDriverRestartedHandlers.class)
-                     final Set<EventHandler<StartTime>> serviceRestartHandlers,
+                     final Set<EventHandler<DriverRestarted>> 
serviceRestartHandlers,
                      final DriverRestartManager driverRestartManager) {
     this.startHandlers = startHandlers;
     this.restartHandlers = restartHandlers;
@@ -71,17 +74,15 @@ public final class DriverStartHandler implements 
EventHandler<StartTime> {
 
   private void onRestart(final StartTime startTime) {
     if (this.restartHandlers.size() > 0) {
-      for (EventHandler<StartTime> serviceRestartHandler : 
this.serviceRestartHandlers) {
-        serviceRestartHandler.onNext(startTime);
-      }
+      final List<EventHandler<DriverRestarted>> orderedRestartHandlers =
+          new ArrayList<>(this.serviceRestartHandlers.size() + 
this.restartHandlers.size());
 
-      for (EventHandler<StartTime> restartHandler : this.restartHandlers){
-        restartHandler.onNext(startTime);
-      }
+      orderedRestartHandlers.addAll(this.serviceRestartHandlers);
+      orderedRestartHandlers.addAll(this.restartHandlers);
 
       // This can only be called after calling client restart handlers because 
REEF.NET
       // JobDriver requires making this call to set up the InterOp handlers.
-      this.driverRestartManager.onRestart();
+      this.driverRestartManager.onRestart(startTime, orderedRestartHandlers);
     } else {
       throw new DriverFatalRuntimeException("Driver restart happened, but no 
ON_DRIVER_RESTART handler is bound.");
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriverRestart.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriverRestart.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriverRestart.java
index 9ef90bb..2a33c8c 100644
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriverRestart.java
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriverRestart.java
@@ -18,9 +18,9 @@
  */
 package org.apache.reef.examples.hello;
 
+import org.apache.reef.driver.restart.DriverRestarted;
 import org.apache.reef.tang.annotations.Unit;
 import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.event.StartTime;
 
 import javax.inject.Inject;
 import java.util.logging.Level;
@@ -41,10 +41,10 @@ public final class HelloDriverRestart {
   /**
    * Handles Restarts. Prints a message.
    */
-  public final class DriverRestartHandler implements EventHandler<StartTime> {
+  public final class DriverRestartHandler implements 
EventHandler<DriverRestarted> {
     @Override
-    public void onNext(final StartTime value) {
-      LOG.log(Level.INFO, "Hello, driver restarted at " + value);
+    public void onNext(final DriverRestarted value) {
+      LOG.log(Level.INFO, "Hello, driver restarted at " + 
value.getStartTime());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2cd1e4bf/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
 
b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
index 0f2aa73..29c9457 100644
--- 
a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
+++ 
b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java
@@ -22,6 +22,7 @@ import org.apache.reef.driver.catalog.NodeDescriptor;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.driver.evaluator.AllocatedEvaluator;
 import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.restart.DriverRestarted;
 import org.apache.reef.driver.task.RunningTask;
 import org.apache.reef.runtime.common.driver.DriverStatusManager;
 import org.apache.reef.runtime.common.utils.RemoteManager;
@@ -229,10 +230,10 @@ public final class ReefEventStateManager {
   /**
    * Job Driver has been restarted.
    */
-  public final class DriverRestartHandler implements EventHandler<StartTime> {
+  public final class DriverRestartHandler implements 
EventHandler<DriverRestarted> {
     @Override
     @SuppressWarnings("checkstyle:hiddenfield")
-    public void onNext(final StartTime restartTime) {
+    public void onNext(final DriverRestarted restartTime) {
       LOG.log(Level.INFO, "DriverRestartHandler called. StartTime: {0}", 
restartTime);
     }
   }

Reply via email to