Adapted test cases to actor model after rebasing.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/bd4ee47b Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/bd4ee47b Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/bd4ee47b Branch: refs/heads/master Commit: bd4ee47bb6b15c695e6c6946475c2d38099bad92 Parents: b8d0a0a Author: Till Rohrmann <[email protected]> Authored: Fri Nov 14 11:35:45 2014 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Dec 18 18:58:31 2014 +0100 ---------------------------------------------------------------------- .../flink/streaming/util/ClusterUtil.java | 1 + .../util/AbstractRuntimeUDFContext.java | 7 +- flink-runtime/pom.xml | 7 + .../runtime/execution/RuntimeEnvironment.java | 2 +- .../librarycache/BlobLibraryCacheManager.java | 3 +- .../runtime/executiongraph/ExecutionGraph.java | 33 ++- .../instance/InstanceConnectionInfo.java | 2 +- .../apache/flink/runtime/jobgraph/JobGraph.java | 2 - .../jobmanager/web/JobmanagerInfoServlet.java | 3 +- .../runtime/jobmanager/web/JsonFactory.java | 3 +- .../taskmanager/TaskInputSplitProvider.java | 2 +- .../flink/runtime/jobmanager/JobManager.scala | 35 ++- .../runtime/messages/JobmanagerMessages.scala | 4 +- .../runtime/minicluster/FlinkMiniCluster.scala | 27 +- .../minicluster/LocalFlinkMiniCluster.scala | 34 +-- .../flink/runtime/taskmanager/TaskManager.scala | 39 +-- .../BlobLibraryCacheManagerTest.java | 3 +- .../ExecutionGraphRestartTest.java | 127 --------- .../executiongraph/ExecutionGraphTestUtils.java | 16 +- .../ExecutionVertexSchedulingTest.java | 31 ++- .../TaskManagerLossFailsTasksTest.java | 79 ------ .../instance/InstanceConnectionInfoTest.java | 4 +- .../runtime/jobmanager/RecoveryITCase.java | 277 ------------------- .../jobmanager/tasks/ReceiverBlockingOnce.java | 52 ---- .../jobmanager/tasks/ReceiverFailingOnce.java | 50 ---- .../flink/runtime/taskmanager/TaskTest.java | 4 +- .../ExecutionGraphRestartTest.scala | 128 +++++++++ .../TaskManagerLossFailsTasksTest.scala | 79 ++++++ .../runtime/jobmanager/RecoveryITCase.scala | 176 ++++++++++++ .../apache/flink/runtime/jobmanager/Tasks.scala | 25 ++ .../runtime/testingUtils/TestingCluster.scala | 2 +- .../testingUtils/TestingTaskManager.scala | 5 +- .../TestingTaskManagerMessages.scala | 7 +- .../runtime/testingUtils/TestingUtils.scala | 1 + flink-test-utils/pom.xml | 6 + .../flink/test/util/AbstractTestBase.java | 44 ++- .../flink/test/util/JavaProgramTestBase.java | 6 +- .../apache/flink/test/util/FailingTestBase.java | 6 +- 38 files changed, 622 insertions(+), 710 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java index a103dcb..bf5ba73 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.util; import java.net.InetSocketAddress; +import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.client.program.Client; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index 57d1261..6b755e1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.functions.util; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.concurrent.FutureTask; @@ -98,7 +99,7 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { } @Override - public <V, A> void addAccumulator(String name, Accumulator<V, A> accumulator) { + public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) { if (accumulators.containsKey(name)) { throw new UnsupportedOperationException("The counter '" + name + "' already exists and cannot be added."); @@ -108,7 +109,7 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { @SuppressWarnings("unchecked") @Override - public <V, A> Accumulator<V, A> getAccumulator(String name) { + public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) { return (Accumulator<V, A>) accumulators.get(name); } @@ -130,7 +131,7 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { // -------------------------------------------------------------------------------------------- @SuppressWarnings("unchecked") - private <V, A> Accumulator<V, A> getAccumulator(String name, + private <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name, Class<? extends Accumulator<V, A>> accumulatorClass) { Accumulator<?, ?> accumulator = accumulators.get(name); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 6922e7b..f04475c 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -331,6 +331,13 @@ under the License. </manifest> </archive> </configuration> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java index efd4bbb..e91344f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java @@ -136,7 +136,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf ClassLoader userCodeClassLoader, MemoryManager memoryManager, IOManager ioManager, InputSplitProvider inputSplitProvider, - ActorRef accumulator) + ActorRef accumulator, BroadcastVariableManager bcVarManager) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index 0bb6fd3..c9f96ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -70,7 +70,8 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC private final BlobService blobService; // -------------------------------------------------------------------------------------------- - + + public BlobLibraryCacheManager(BlobService blobService, long cleanupInterval){ this.blobService = blobService; // Initializing the clean up task http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 4e6a56b..915140c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -26,11 +26,13 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import akka.actor.ActorRef; +import org.apache.flink.runtime.akka.AkkaUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.configuration.Configuration; @@ -54,6 +56,8 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged; import org.apache.flink.util.ExceptionUtils; +import static akka.dispatch.Futures.future; + public class ExecutionGraph { @@ -434,18 +438,18 @@ public class ExecutionGraph { if (current == JobStatus.FAILING) { if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) { numberOfRetriesLeft--; - - execute(new Runnable() { + future(new Callable<Object>() { @Override - public void run() { - try { + public Object call() throws Exception { + try{ Thread.sleep(delayBeforeRetrying); - } catch (InterruptedException e) { + }catch(InterruptedException e){ // should only happen on shutdown } restart(); + return null; } - }); + }, AkkaUtils.globalExecutionContext()); break; } else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) { @@ -720,9 +724,7 @@ public class ExecutionGraph { fail(error); } } - } - } - + public void restart() { try { if (state == JobStatus.FAILED) { @@ -735,23 +737,24 @@ public class ExecutionGraph { if (scheduler == null) { throw new IllegalStateException("The execution graph has not been schedudled before - scheduler is null."); } - + this.currentExecutions.clear(); this.edges.clear(); - + for (ExecutionJobVertex jv : this.verticesInCreationOrder) { jv.resetForNewExecution(); } - + for (int i = 0; i < stateTimestamps.length; i++) { stateTimestamps[i] = 0; } nextVertexToFinish = 0; transitionState(JobStatus.RESTARTING, JobStatus.CREATED); } - + scheduleForExecution(scheduler); - } - catch (Throwable t) { + } catch (Throwable t) { fail(t); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java index c574506..20a5601 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java @@ -194,7 +194,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In @Override public String toString() { - return hostname() + " ( dataPort=" + dataPort + ")"; + return getFQDNHostname() + " ( dataPort=" + dataPort + ")"; } @Override http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 2db4ff4..3497824 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -331,8 +331,6 @@ public class JobGraph implements Serializable { } } - this.numExecutionRetries = in.readInt(); - out.writeInt(numExecutionRetries); // -------------------------------------------------------------------------------------------- // Handling of attached JAR files // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java index e1afa7a..b842a9b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java @@ -302,7 +302,8 @@ public class JobmanagerInfoServlet extends HttpServlet { wrt.write(","); } wrt.write("{"); - wrt.write("\"node\": \"" + (slot == null ? "(none)" : slot.getInstance().getInstanceConnectionInfo().hostname()) + "\","); + wrt.write("\"node\": \"" + (slot == null ? "(none)" : slot + .getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\","); wrt.write("\"message\": \"" + (failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + "\""); wrt.write("}"); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java index 5fa19e2..8d48975 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java @@ -40,7 +40,8 @@ public class JsonFactory { json.append("\"vertexstatus\": \"" + vertex.getExecutionState() + "\","); AllocatedSlot slot = vertex.getCurrentAssignedResource(); - String instanceName = slot == null ? "(null)" : slot.getInstance().getInstanceConnectionInfo().hostname(); + String instanceName = slot == null ? "(null)" : slot.getInstance() + .getInstanceConnectionInfo().getFQDNHostname(); json.append("\"vertexinstancename\": \"" + instanceName + "\""); json.append("}"); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java index 669f94c..9853ded 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java @@ -39,7 +39,7 @@ public class TaskInputSplitProvider implements InputSplitProvider { private final FiniteDuration timeout; public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, JobVertexID vertexId, - FiniteDuration timeout) { + FiniteDuration timeout) { this.jobManager = jobManager; this.jobId = jobId; this.vertexId = vertexId; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 1fa89c1..822a34c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit import akka.actor._ import akka.pattern.Patterns import akka.pattern.{ask, pipe} -import com.google.common.base.Preconditions import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration} import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.runtime.blob.BlobServer @@ -48,7 +47,7 @@ import org.apache.flink.runtime.profiling.ProfilingUtils import org.slf4j.LoggerFactory import scala.collection.convert.WrapAsScala -import scala.concurrent.{Await, Future} +import scala.concurrent.{Future} import scala.concurrent.duration._ class JobManager(val configuration: Configuration) extends @@ -61,7 +60,11 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { log.info("Starting job manager.") - val (archiveCount, profiling, cleanupInterval) = JobManager.parseConfiguration(configuration) + val (archiveCount, + profiling, + cleanupInterval, + defaultExecutionRetries, + delayBetweenRetries) = JobManager.parseConfiguration(configuration) // Props for the profiler actor def profilerProps: Props = Props(classOf[JobManagerProfiler]) @@ -128,13 +131,22 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}}).") // Create the user code class loader - libraryCacheManager.register(jobGraph.getJobID, jobGraph.getUserJarBlobKeys) + libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys) val (executionGraph, jobInfo) = currentJobs.getOrElseUpdate(jobGraph.getJobID(), (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName, jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys), JobInfo(sender(), System.currentTimeMillis()))) + val jobNumberRetries = if(jobGraph.getNumberOfExecutionRetries >= 0){ + jobGraph.getNumberOfExecutionRetries + }else{ + defaultExecutionRetries + } + + executionGraph.setNumberOfRetriesLeft(jobNumberRetries) + executionGraph.setDelayBeforeRetrying(delayBetweenRetries) + val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID) if (userCodeLoader == null) { @@ -203,7 +215,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { s"Cleanup job ${jobGraph.getJobID}.") } case None => - libraryCacheManager.unregister(jobGraph.getJobID) + libraryCacheManager.unregisterJob(jobGraph.getJobID) currentJobs.remove(jobGraph.getJobID) } @@ -380,7 +392,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala { } try { - libraryCacheManager.unregister(jobID) + libraryCacheManager.unregisterJob(jobID) } catch { case t: Throwable => log.error(t, s"Could not properly unregister job ${jobID} form the library cache.") @@ -453,7 +465,7 @@ object JobManager { (actorSystem, startActor(configuration)) } - def parseConfiguration(configuration: Configuration): (Int, Boolean, Long) = { + def parseConfiguration(configuration: Configuration): (Int, Boolean, Long, Int, Long) = { val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT) val profilingEnabled = configuration.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true) @@ -462,7 +474,14 @@ object JobManager { .LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000 - (archiveCount, profilingEnabled, cleanupInterval) + val executionRetries = configuration.getInteger(ConfigConstants + .DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES); + + val delayBetweenRetries = 2 * configuration.getLong( + ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT) + + (archiveCount, profilingEnabled, cleanupInterval, executionRetries, delayBetweenRetries) } def startActor(configuration: Configuration)(implicit actorSystem: ActorSystem): ActorRef = { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala index eeb3828..b1c5f1f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala @@ -200,14 +200,14 @@ object JobManagerMessages { * @param jobID * @param msg */ - case class JobResultCanceled(jobID: JobID, msg: String) + case class JobResultCanceled(jobID: JobID, msg: String) extends JobResult /** * Denotes a failed job execution. * @param jobID * @param msg */ - case class JobResultFailed(jobID: JobID, msg:String) + case class JobResultFailed(jobID: JobID, msg:String) extends JobResult sealed trait SubmissionResponse{ def jobID: JobID http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 6d0da27..43be786 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -25,6 +25,7 @@ import akka.actor.{ActorRef, ActorSystem} import org.apache.flink.api.common.io.FileOutputFormat import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.client.JobClient import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager import org.apache.flink.runtime.util.EnvironmentInformation import org.slf4j.LoggerFactory @@ -53,6 +54,8 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) { val (taskManagerActorSystems, taskManagerActors) = actorSystemsTaskManagers.unzip + val jobClientActorSystem = AkkaUtils.createActorSystem() + waitForTaskManagersToBeRegistered() def generateConfiguration(userConfiguration: Configuration): Configuration @@ -76,16 +79,34 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) { configuration) } - def getJobManager: ActorRef = { - jobManagerActor + def getJobClient(): ActorRef ={ + val config = new Configuration() + + config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, FlinkMiniCluster.HOSTNAME) + config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort) + + JobClient.startActorWithConfiguration(config)(jobClientActorSystem) } + def getJobClientActorSystem: ActorSystem = jobClientActorSystem + + def getJobManagerRPCPort: Int = { + configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1) + } + def getJobManager: ActorRef = { + jobManagerActor + } def getTaskManagers = { taskManagerActors } + def getTaskManagersAsJava = { + import collection.JavaConverters._ + taskManagerActors.asJava + } + def stop(): Unit = { LOG.info("Stopping FlinkMiniCluster.") shutdown() @@ -95,9 +116,11 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) { def shutdown(): Unit = { taskManagerActorSystems foreach { _.shutdown() } jobManagerActorSystem.shutdown() + jobClientActorSystem.shutdown() } def awaitTermination(): Unit = { + jobClientActorSystem.awaitTermination() taskManagerActorSystems foreach { _.awaitTermination()} jobManagerActorSystem.awaitTermination() } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index fb6e36b..edf16bb 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -18,22 +18,17 @@ package org.apache.flink.runtime.minicluster -import java.io.File - import akka.actor.{ActorRef, ActorSystem} -import org.apache.flink.configuration.{GlobalConfiguration, ConfigConstants, Configuration} +import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.client.JobClient import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.taskmanager.TaskManager import org.slf4j.LoggerFactory -import scopt.OptionParser class LocalFlinkMiniCluster(userConfiguration: Configuration) extends FlinkMiniCluster(userConfiguration){ - val actorSystem = AkkaUtils.createActorSystem() - override def generateConfiguration(userConfiguration: Configuration): Configuration = { val forNumberString = System.getProperty("forkNumber") @@ -87,33 +82,6 @@ FlinkMiniCluster(userConfiguration){ TaskManager.startActorWithConfiguration(FlinkMiniCluster.HOSTNAME, config, false)(system) } - - def getJobClient(): ActorRef ={ - val config = new Configuration() - - config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, FlinkMiniCluster.HOSTNAME) - config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort) - - JobClient.startActorWithConfiguration(config)(actorSystem) - } - - def getJobClientActorSystem: ActorSystem = actorSystem - - override def shutdown(): Unit = { - super.shutdown() - - actorSystem.shutdown() - } - - override def awaitTermination(): Unit = { - actorSystem.awaitTermination() - - super.awaitTermination() - } - - def getJobManagerRPCPort: Int = { - configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1) - } } object LocalFlinkMiniCluster{ http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 66d25c5..7004881 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -32,13 +32,14 @@ import org.apache.flink.core.fs.Path import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.blob.BlobCache +import org.apache.flink.runtime.broadcast.BroadcastVariableManager import org.apache.flink.runtime.execution.{ExecutionState, RuntimeEnvironment} import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager} import org.apache.flink.runtime.executiongraph.ExecutionAttemptID import org.apache.flink.runtime.filecache.FileCache import org.apache.flink.runtime.instance.{InstanceConnectionInfo, HardwareDescription, InstanceID} -import org.apache.flink.runtime.io.disk.iomanager.IOManager +import org.apache.flink.runtime.io.disk.iomanager.{IOManagerAsync} import org.apache.flink.runtime.io.network.netty.NettyConnectionManager import org.apache.flink.runtime.io.network.{NetworkConnectionManager, LocalConnectionManager, ChannelManager} @@ -81,8 +82,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka val HEARTBEAT_INTERVAL = 1000 millisecond TaskManager.checkTempDirs(tmpDirPaths) - val ioManager = new IOManager(tmpDirPaths) + val ioManager = new IOManagerAsync(tmpDirPaths) val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, pageSize) + val bcVarManager = new BroadcastVariableManager(); val hardwareDescription = HardwareDescription.extractFromSystem(memoryManager.getMemorySize) val fileCache = new FileCache() val runningTasks = scala.collection.mutable.HashMap[ExecutionAttemptID, Task]() @@ -232,7 +234,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka var jarsRegistered = false try { - libraryCacheManager.register(jobID, tdd.getRequiredJarFiles()); + libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles()); jarsRegistered = true val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID) @@ -252,7 +254,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID, timeout) val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, memoryManager, - ioManager, splitProvider,currentJobManager) + ioManager, splitProvider, currentJobManager, bcVarManager) task.setEnvironment(env) @@ -297,7 +299,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka if (jarsRegistered) { try { - libraryCacheManager.unregister(jobID) + libraryCacheManager.unregisterTask(jobID, executionID) } catch { case ioe: IOException => log.debug(s"Unregistering the execution ${executionID} caused an IOException.") @@ -350,7 +352,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka task.unregisterMemoryManager(memoryManager) try { - libraryCacheManager.unregister(task.getJobID) + libraryCacheManager.unregisterTask(task.getJobID, executionID) } catch { case ioe: IOException => log.error(ioe, s"Unregistering the execution ${executionID} caused an IOException.") @@ -543,17 +545,6 @@ object TaskManager { val numberOfSlots = if (slots > 0) slots else 1 - val configuredMemory: Long = configuration.getInteger(ConfigConstants - .TASK_MANAGER_MEMORY_SIZE_KEY, -1) - - val memorySize = if (configuredMemory > 0) { - configuredMemory << 20 - } else { - val fraction = configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, - ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION) - (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag * fraction).toLong - } - val pageSize = configuration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE) @@ -585,6 +576,20 @@ object TaskManager { lowWaterMark, highWaterMark) } + val networkBufferMem = if(localExecution) 0 else numBuffers * bufferSize; + + val configuredMemory: Long = configuration.getInteger(ConfigConstants + .TASK_MANAGER_MEMORY_SIZE_KEY, -1) + + val memorySize = if (configuredMemory > 0) { + configuredMemory << 20 + } else { + val fraction = configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, + ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION) + ((EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag - networkBufferMem ) * fraction) + .toLong + } + val memoryLoggingIntervalMs = configuration.getBoolean(ConfigConstants .TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index 0cf3e02..71c0669 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -59,7 +59,8 @@ public class BlobLibraryCacheManagerTest { buf[0] += 1; keys.add(bc.put(buf)); - libraryCacheManager = new BlobLibraryCacheManager(server, GlobalConfiguration.getConfiguration()); + long cleanupInterval = 1000l; + libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval); libraryCacheManager.registerJob(jid, keys); List<File> files = new ArrayList<File>(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java deleted file mode 100644 index f1855f2..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getSimpleAcknowledgingTaskmanager; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.jobgraph.AbstractJobVertex; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobID; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; -import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable; -import org.apache.flink.runtime.protocols.TaskOperationProtocol; -import org.junit.Test; - -public class ExecutionGraphRestartTest { - - @Test - public void testRestartManually() { - final int NUM_TASKS = 31; - - try { - TaskOperationProtocol tm = getSimpleAcknowledgingTaskmanager(); - Instance instance = getInstance(tm); - - Scheduler scheduler = new Scheduler(); - scheduler.newInstanceAvailable(instance); - - // The job: - - final AbstractJobVertex sender = new AbstractJobVertex("Task"); - sender.setInvokableClass(NoOpInvokable.class); - sender.setParallelism(NUM_TASKS); - - final JobGraph jobGraph = new JobGraph("Pointwise Job", sender); - - ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration()); - eg.setNumberOfRetriesLeft(0); - eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); - - assertEquals(JobStatus.CREATED, eg.getState()); - - eg.scheduleForExecution(scheduler); - assertEquals(JobStatus.RUNNING, eg.getState()); - - eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception")); - assertEquals(JobStatus.FAILED, eg.getState()); - - eg.restart(); - assertEquals(JobStatus.RUNNING, eg.getState()); - - for (ExecutionVertex v : eg.getAllExecutionVertices()) { - v.executionFinished(); - } - assertEquals(JobStatus.FINISHED, eg.getState()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testRestartSelf() { - final int NUM_TASKS = 31; - - try { - TaskOperationProtocol tm = getSimpleAcknowledgingTaskmanager(); - Instance instance = getInstance(tm); - - Scheduler scheduler = new Scheduler(); - scheduler.newInstanceAvailable(instance); - - // The job: - - final AbstractJobVertex sender = new AbstractJobVertex("Task"); - sender.setInvokableClass(NoOpInvokable.class); - sender.setParallelism(NUM_TASKS); - - final JobGraph jobGraph = new JobGraph("Pointwise Job", sender); - - ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration()); - eg.setNumberOfRetriesLeft(1); - eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); - - assertEquals(JobStatus.CREATED, eg.getState()); - - eg.scheduleForExecution(scheduler); - assertEquals(JobStatus.RUNNING, eg.getState()); - - eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception")); - - // should have restarted itself - assertEquals(JobStatus.RUNNING, eg.getState()); - - for (ExecutionVertex v : eg.getAllExecutionVertices()) { - v.executionFinished(); - } - assertEquals(JobStatus.FINISHED, eg.getState()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index a9abd0c..fdeaafd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -94,17 +94,19 @@ public class ExecutionGraphTestUtils { // -------------------------------------------------------------------------------------------- // utility mocking methods // -------------------------------------------------------------------------------------------- - - public static Instance getInstance(final ActorRef taskManager) throws Exception { - return getInstance(top, 1); + + public static Instance getInstance(final ActorRef taskManager) throws + Exception { + return getInstance(taskManager, 1); } - - public static Instance getInstance(final TaskOperationProtocol top, int numSlots) throws Exception { - HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024); + + public static Instance getInstance(final ActorRef taskManager, final int numberOfSlots) throws + Exception { + HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024); InetAddress address = InetAddress.getByName("127.0.0.1"); InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001); - return new Instance(taskManager, connection, new InstanceID(), hardwareDescription, 1); + return new Instance(taskManager, connection, new InstanceID(), hardwareDescription, numberOfSlots); } public static class SimpleAcknowledgingTaskManager extends UntypedActor { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index e6e71e6..29a1b3a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -24,6 +24,10 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; @@ -33,12 +37,26 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Matchers; public class ExecutionVertexSchedulingTest { + private static ActorSystem system; + @BeforeClass + public static void setup(){ + system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); + } + + @AfterClass + public static void teardown(){ + JavaTestKit.shutdownActorSystem(system); + system = null; + } @Test public void testSlotReleasedWhenScheduledImmediately() { @@ -108,10 +126,13 @@ public class ExecutionVertexSchedulingTest { } @Test - public void testScheduleToDeploy() { + public void testScheduleToRunning() { try { - // a slot than cannot be deployed to - final Instance instance = getInstance(ActorRef.noSender()); + TestingUtils.setCallingThreadDispatcher(system); + ActorRef tm = TestActorRef.create(system, Props.create(ExecutionGraphTestUtils + .SimpleAcknowledgingTaskManager.class)); + + final Instance instance = getInstance(tm); final AllocatedSlot slot = instance.allocateSlot(new JobID()); final ExecutionJobVertex ejv = getExecutionVertex(new JobVertexID()); @@ -124,11 +145,13 @@ public class ExecutionVertexSchedulingTest { // try to deploy to the slot vertex.scheduleForExecution(scheduler, false); - assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState()); + assertEquals(ExecutionState.RUNNING, vertex.getExecutionState()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); + }finally{ + TestingUtils.setGlobalExecutionContext(); } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.java deleted file mode 100644 index 37bdaa3..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getSimpleAcknowledgingTaskmanager; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.jobgraph.AbstractJobVertex; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobID; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; -import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable; -import org.apache.flink.runtime.protocols.TaskOperationProtocol; -import org.junit.Test; - -public class TaskManagerLossFailsTasksTest { - - @Test - public void testTasksFailWhenTaskManagerLost() { - try { - TaskOperationProtocol tm1 = getSimpleAcknowledgingTaskmanager(); - TaskOperationProtocol tm2 = getSimpleAcknowledgingTaskmanager(); - - Instance instance1 = getInstance(tm1, 10); - Instance instance2 = getInstance(tm2, 10); - - Scheduler scheduler = new Scheduler(); - scheduler.newInstanceAvailable(instance1); - scheduler.newInstanceAvailable(instance2); - - // The job: - - final AbstractJobVertex sender = new AbstractJobVertex("Task"); - sender.setInvokableClass(NoOpInvokable.class); - sender.setParallelism(20); - - final JobGraph jobGraph = new JobGraph("Pointwise Job", sender); - - ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration()); - eg.setNumberOfRetriesLeft(0); - eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); - - - assertEquals(JobStatus.CREATED, eg.getState()); - - eg.scheduleForExecution(scheduler); - assertEquals(JobStatus.RUNNING, eg.getState()); - - instance1.markDead(); - assertEquals(JobStatus.FAILING, eg.getState()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java index 3c371d6..c072e59 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java @@ -115,7 +115,7 @@ public class InstanceConnectionInfoTest { @Test public void testGetHostname0() { try { - final InstanceConnectionInfo info1 = PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 10523, 19871)); + final InstanceConnectionInfo info1 = PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871)); Whitebox.setInternalState(info1, "fqdnHostName", "worker2.cluster.mycompany.com"); Assert.assertEquals("worker2", info1.getHostname()); } catch (Exception e) { @@ -127,7 +127,7 @@ public class InstanceConnectionInfoTest { @Test public void testGetHostname1() { try { - final InstanceConnectionInfo info1 = PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 10523, 19871)); + final InstanceConnectionInfo info1 = PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871)); Whitebox.setInternalState(info1, "fqdnHostName", "worker10"); Assert.assertEquals("worker10", info1.getHostname()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java deleted file mode 100644 index be3e765..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager; - -import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager; -import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.client.AbstractJobResult; -import org.apache.flink.runtime.client.JobSubmissionResult; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.instance.LocalInstanceManager; -import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool; -import org.apache.flink.runtime.jobgraph.AbstractJobVertex; -import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.runtime.jobmanager.tasks.ReceiverBlockingOnce; -import org.apache.flink.runtime.jobmanager.tasks.ReceiverFailingOnce; -import org.apache.flink.runtime.jobmanager.tasks.Sender; -import org.junit.Test; - -/** - * This test is intended to cover the basic functionality of the {@link JobManager}. - */ -public class RecoveryITCase { - - @Test - public void testForwardJob() { - - ReceiverFailingOnce.resetFailedBefore(); - - final int NUM_TASKS = 31; - - JobManager jm = null; - - try { - final AbstractJobVertex sender = new AbstractJobVertex("Sender"); - final AbstractJobVertex receiver = new AbstractJobVertex("Receiver"); - - sender.setInvokableClass(Sender.class); - receiver.setInvokableClass(ReceiverFailingOnce.class); - - sender.setParallelism(NUM_TASKS); - receiver.setParallelism(NUM_TASKS); - - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE); - - final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver); - jobGraph.setNumberOfExecutionRetries(1); - - jm = startJobManager(2 * NUM_TASKS); - - final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager()) - .getTaskManagers()[0].getChannelManager().getGlobalBufferPool(); - - JobSubmissionResult result = jm.submitJob(jobGraph); - - if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) { - System.out.println(result.getDescription()); - } - assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode()); - - // monitor the execution - ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID()); - - if (eg != null) { - eg.waitForJobEnd(); - - if (eg.getState() != JobStatus.FINISHED) { - Throwable t = eg.getFailureCause(); - String message = null; - - if (t != null) { - t.printStackTrace(); - message = t.getMessage(); - } - fail("Execution failed despite recovery: " + message); - } - } - else { - // already done, that was fast; - } - - // make sure that in any case, the network buffers are all returned - waitForTaskThreadsToBeTerminated(); - assertEquals(bp.numBuffers(), bp.numAvailableBuffers()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { - if (jm != null) { - jm.shutdown(); - } - } - } - - @Test - public void testForwardJobWithSlotSharing() { - - ReceiverFailingOnce.resetFailedBefore(); - - final int NUM_TASKS = 31; - - JobManager jm = null; - - try { - final AbstractJobVertex sender = new AbstractJobVertex("Sender"); - final AbstractJobVertex receiver = new AbstractJobVertex("Receiver"); - - sender.setInvokableClass(Sender.class); - receiver.setInvokableClass(ReceiverFailingOnce.class); - - sender.setParallelism(NUM_TASKS); - receiver.setParallelism(NUM_TASKS); - - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE); - - SlotSharingGroup sharingGroup = new SlotSharingGroup(); - sender.setSlotSharingGroup(sharingGroup); - receiver.setSlotSharingGroup(sharingGroup); - - final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver); - jobGraph.setNumberOfExecutionRetries(1); - - jm = startJobManager(NUM_TASKS); - - final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager()) - .getTaskManagers()[0].getChannelManager().getGlobalBufferPool(); - - JobSubmissionResult result = jm.submitJob(jobGraph); - - if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) { - System.out.println(result.getDescription()); - } - assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode()); - - // monitor the execution - ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID()); - - if (eg != null) { - eg.waitForJobEnd(); - - if (eg.getState() != JobStatus.FINISHED) { - Throwable t = eg.getFailureCause(); - String message = null; - - if (t != null) { - t.printStackTrace(); - message = t.getMessage(); - } - fail("Execution failed despite recovery: " + message); - } - } - else { - // already done, that was fast; - } - - // make sure that in any case, the network buffers are all returned - waitForTaskThreadsToBeTerminated(); - assertEquals(bp.numBuffers(), bp.numAvailableBuffers()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { - if (jm != null) { - jm.shutdown(); - } - } - } - - @Test - public void testRecoverTaskManagerFailure() { - - final int NUM_TASKS = 31; - - JobManager jm = null; - - try { - final AbstractJobVertex sender = new AbstractJobVertex("Sender"); - final AbstractJobVertex receiver = new AbstractJobVertex("Receiver"); - - sender.setInvokableClass(Sender.class); - receiver.setInvokableClass(ReceiverBlockingOnce.class); - sender.setParallelism(NUM_TASKS); - receiver.setParallelism(NUM_TASKS); - - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE); - - SlotSharingGroup sharingGroup = new SlotSharingGroup(); - sender.setSlotSharingGroup(sharingGroup); - receiver.setSlotSharingGroup(sharingGroup); - - final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver); - jobGraph.setNumberOfExecutionRetries(1); - - // make sure we have fast heartbeats and failure detection - Configuration cfg = new Configuration(); - cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 3000); - cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 1000); - - jm = startJobManager(2, NUM_TASKS, cfg); - - JobSubmissionResult result = jm.submitJob(jobGraph); - - if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) { - System.out.println(result.getDescription()); - } - assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode()); - - // monitor the execution - ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID()); - - // wait for a bit until all is running, make sure the second attempt does not block - Thread.sleep(300); - ReceiverBlockingOnce.setShouldNotBlock(); - - // shutdown one of the taskmanagers - ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0].shutdown(); - - // wait for the recovery to do its work - if (eg != null) { - eg.waitForJobEnd(); - - if (eg.getState() != JobStatus.FINISHED) { - Throwable t = eg.getFailureCause(); - String message = null; - - if (t != null) { - t.printStackTrace(); - message = t.getMessage(); - } - fail("Execution failed despite recovery: " + message); - } - } - else { - // already done, that was fast; - } - - // make sure that in any case, the network buffers are all returned - waitForTaskThreadsToBeTerminated(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { - if (jm != null) { - jm.shutdown(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java deleted file mode 100644 index 3425842..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.tasks; - -import org.apache.flink.runtime.io.network.api.RecordReader; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.types.IntegerRecord; - -public final class ReceiverBlockingOnce extends AbstractInvokable { - - private static boolean shouldBlock = true; - - private RecordReader<IntegerRecord> reader; - - @Override - public void registerInputOutput() { - reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class); - } - - @Override - public void invoke() throws Exception { - if (shouldBlock) { - - Object o = new Object(); - synchronized (o) { - o.wait(); - } - } - - while (reader.next() != null); - } - - public static void setShouldNotBlock() { - shouldBlock = false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java deleted file mode 100644 index 3fad6b1..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.tasks; - -import org.apache.flink.runtime.io.network.api.RecordReader; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.types.IntegerRecord; - -public final class ReceiverFailingOnce extends AbstractInvokable { - - private static boolean hasFailedBefore = false; - - private RecordReader<IntegerRecord> reader; - - @Override - public void registerInputOutput() { - reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class); - } - - @Override - public void invoke() throws Exception { - if (!hasFailedBefore && getEnvironment().getIndexInSubtaskGroup() == 0) { - hasFailedBefore = true; - throw new Exception("Test exception"); - } - - while (reader.next() != null); - } - - - public static void resetFailedBefore() { - hasFailedBefore = false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index e00439b..4b0a6ec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -265,7 +265,7 @@ public class TaskTest { RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, getClass().getClassLoader(), mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class), - mock(ActorRef.class)); + mock(ActorRef.class), new BroadcastVariableManager()); task.setEnvironment(env); @@ -303,7 +303,7 @@ public class TaskTest { RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, getClass().getClassLoader(), mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class), - mock(ActorRef.class)); + mock(ActorRef.class), new BroadcastVariableManager()); task.setEnvironment(env); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala new file mode 100644 index 0000000..f4ce7b0 --- /dev/null +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph + +import akka.actor.{Props, ActorSystem} +import akka.testkit.{TestKit} +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils +import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, AbstractJobVertex} +import org.apache.flink.runtime.jobmanager.Tasks +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler +import org.apache.flink.runtime.testingUtils.TestingUtils +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +class ExecutionGraphRestartTest(_system: ActorSystem) extends TestKit(_system) with WordSpecLike +with Matchers with BeforeAndAfterAll { + + def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) + + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } + + val NUM_TASKS = 31 + + "The execution graph" must { + "be manually restartable" in { + try { + val tm = system.actorOf(Props(classOf[ExecutionGraphTestUtils + .SimpleAcknowledgingTaskManager], "TaskManager")) + val instance = ExecutionGraphTestUtils.getInstance(tm) + + val scheduler = new Scheduler + scheduler.newInstanceAvailable(instance) + + val sender = new AbstractJobVertex("Task") + sender.setInvokableClass(classOf[Tasks.NoOpInvokable]) + sender.setParallelism(NUM_TASKS) + + val jobGraph = new JobGraph("Pointwise job", sender) + + val eg = new ExecutionGraph(new JobID(), "test job", new Configuration()) + eg.setNumberOfRetriesLeft(0) + eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources) + + eg.getState should equal(JobStatus.CREATED) + + eg.scheduleForExecution(scheduler) + eg.getState should equal(JobStatus.RUNNING) + + eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception")) + eg.getState should equal(JobStatus.FAILED) + + eg.restart() + eg.getState should equal(JobStatus.RUNNING) + + import collection.JavaConverters._ + for (vertex <- eg.getAllExecutionVertices.asScala) { + vertex.executionFinished() + } + + eg.getState should equal(JobStatus.FINISHED) + } catch { + case t: Throwable => + t.printStackTrace() + fail(t.getMessage) + } + } + + "restart itself automatically" in { + try { + val tm = system.actorOf(Props + (classOf[ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager], "TaskManager")) + val instance = ExecutionGraphTestUtils.getInstance(tm) + + val scheduler = new Scheduler + scheduler.newInstanceAvailable(instance) + + val sender = new AbstractJobVertex("Task") + sender.setInvokableClass(classOf[Tasks.NoOpInvokable]) + sender.setParallelism(NUM_TASKS) + + val jobGraph = new JobGraph("Pointwise job", sender) + + val eg = new ExecutionGraph(new JobID(), "Test job", new Configuration()) + eg.setNumberOfRetriesLeft(1) + eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources) + + eg.getState should equal(JobStatus.CREATED) + + eg.scheduleForExecution(scheduler) + eg.getState should equal(JobStatus.RUNNING) + + eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception")) + + eg.getState should equal(JobStatus.RUNNING) + + import collection.JavaConverters._ + for (vertex <- eg.getAllExecutionVertices.asScala) { + vertex.executionFinished() + } + + eg.getState should equal(JobStatus.FINISHED) + }catch{ + case t: Throwable => + t.printStackTrace() + fail(t.getMessage) + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala new file mode 100644 index 0000000..9884bca --- /dev/null +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph + +import akka.actor.{Props, ActorSystem} +import akka.testkit.TestKit +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionGraphTestUtils} +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils +.SimpleAcknowledgingTaskManager +import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, AbstractJobVertex} +import org.apache.flink.runtime.jobmanager.Tasks +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler +import org.apache.flink.runtime.testingUtils.TestingUtils +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +class TaskManagerLossFailsTasksTest(_system: ActorSystem) extends TestKit(_system) with +WordSpecLike with Matchers with BeforeAndAfterAll { + + def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) + + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } + + "A task manager loss" must { + "fail the assigned tasks" in { + try { + val tm1 = system.actorOf(Props(classOf[SimpleAcknowledgingTaskManager], "TaskManager1")) + val tm2 = system.actorOf(Props(classOf[SimpleAcknowledgingTaskManager], "TaskManager2")) + + val instance1 = ExecutionGraphTestUtils.getInstance(tm1, 10) + val instance2 = ExecutionGraphTestUtils.getInstance(tm2, 10) + + val scheduler = new Scheduler + scheduler.newInstanceAvailable(instance1) + scheduler.newInstanceAvailable(instance2) + + val sender = new AbstractJobVertex("Task") + sender.setInvokableClass(classOf[Tasks.NoOpInvokable]) + sender.setParallelism(20) + + val jobGraph = new JobGraph("Pointwise job", sender) + + val eg = new ExecutionGraph(new JobID(), "test job", new Configuration()) + eg.setNumberOfRetriesLeft(0) + eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources) + + eg.getState should equal(JobStatus.CREATED) + + eg.scheduleForExecution(scheduler) + eg.getState should equal(JobStatus.RUNNING) + + instance1.markDead() + eg.getState should equal(JobStatus.FAILING) + }catch{ + case t:Throwable => + t.printStackTrace() + fail(t.getMessage) + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala new file mode 100644 index 0000000..cc96f4b --- /dev/null +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager + +import akka.actor.{PoisonPill, ActorSystem} +import akka.testkit.{ImplicitSender, TestKit} +import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern, AbstractJobVertex} +import org.apache.flink.runtime.jobmanager.Tasks.{BlockingOnceReceiver, FailingOnceReceiver} +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup +import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmissionSuccess, +SubmitJob} +import org.apache.flink.runtime.testingUtils.TestingUtils +import org.junit.runner.RunWith +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} +import org.scalatest.junit.JUnitRunner + +@RunWith(classOf[JUnitRunner]) +class RecoveryITCase(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with +WordSpecLike with Matchers with BeforeAndAfterAll { + def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) + + override def afterAll: Unit = { + TestKit.shutdownActorSystem(system) + } + + val NUM_TASKS = 31 + + "The recovery" must { + "recover once failing forward job" in { + FailingOnceReceiver.failed = false + + val sender = new AbstractJobVertex("Sender"); + val receiver = new AbstractJobVertex("Receiver"); + + sender.setInvokableClass(classOf[Tasks.Sender]) + receiver.setInvokableClass(classOf[Tasks.FailingOnceReceiver]) + + sender.setParallelism(NUM_TASKS) + receiver.setParallelism(NUM_TASKS) + + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + + val jobGraph = new JobGraph("Pointwise job", sender, receiver) + jobGraph.setNumberOfExecutionRetries(1) + + val cluster = TestingUtils.startTestingCluster(2 * NUM_TASKS) + val jm = cluster.getJobManager + + try { + within(TestingUtils.TESTING_DURATION){ + jm ! SubmitJob(jobGraph) + + expectMsg(SubmissionSuccess(jobGraph.getJobID)) + + val result = expectMsgType[JobResultSuccess] + + result.jobID should equal(jobGraph.getJobID) + } + } catch { + case t: Throwable => + t.printStackTrace() + fail(t.getMessage) + } finally{ + cluster.stop() + } + } + + "recover once failing forward job with slot sharing" in { + FailingOnceReceiver.failed = false + + val sender = new AbstractJobVertex("Sender"); + val receiver = new AbstractJobVertex("Receiver"); + + sender.setInvokableClass(classOf[Tasks.Sender]) + receiver.setInvokableClass(classOf[Tasks.FailingOnceReceiver]) + + sender.setParallelism(NUM_TASKS) + receiver.setParallelism(NUM_TASKS) + + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + + val sharingGroup = new SlotSharingGroup + sender.setSlotSharingGroup(sharingGroup) + receiver.setSlotSharingGroup(sharingGroup) + + val jobGraph = new JobGraph("Pointwise job", sender, receiver) + jobGraph.setNumberOfExecutionRetries(1) + + val cluster = TestingUtils.startTestingCluster(NUM_TASKS) + val jm = cluster.getJobManager + + try { + within(TestingUtils.TESTING_DURATION){ + jm ! SubmitJob(jobGraph) + + expectMsg(SubmissionSuccess(jobGraph.getJobID)) + + val result = expectMsgType[JobResultSuccess] + + result.jobID should equal(jobGraph.getJobID) + } + } catch { + case t: Throwable => + t.printStackTrace() + fail(t.getMessage) + } finally{ + cluster.stop() + } + } + + "recover a task manager failure" in { + BlockingOnceReceiver.blocking = true + + val sender = new AbstractJobVertex("Sender"); + val receiver = new AbstractJobVertex("Receiver"); + + sender.setInvokableClass(classOf[Tasks.Sender]) + receiver.setInvokableClass(classOf[Tasks.BlockingOnceReceiver]) + + sender.setParallelism(NUM_TASKS) + receiver.setParallelism(NUM_TASKS) + + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE) + + val sharingGroup = new SlotSharingGroup + sender.setSlotSharingGroup(sharingGroup) + receiver.setSlotSharingGroup(sharingGroup) + + val jobGraph = new JobGraph("Pointwise job", sender, receiver) + jobGraph.setNumberOfExecutionRetries(1) + + val cluster = TestingUtils.startTestingCluster(NUM_TASKS, 2) + + val jm = cluster.getJobManager + val taskManagers = cluster.getTaskManagers + + try { + within(TestingUtils.TESTING_DURATION){ + jm ! SubmitJob(jobGraph) + + expectMsg(SubmissionSuccess(jobGraph.getJobID)) + + Thread.sleep(300) + BlockingOnceReceiver.blocking = false + taskManagers(0) ! PoisonPill + + val result = expectMsgType[JobResultSuccess] + + result.jobID should equal(jobGraph.getJobID) + } + } catch { + case t: Throwable => + t.printStackTrace() + fail(t.getMessage) + } finally{ + cluster.stop() + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala index bee9578..3306374 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala @@ -76,6 +76,31 @@ object Tasks { } } + class FailingOnceReceiver extends Receiver { + import FailingOnceReceiver.failed + + override def invoke(): Unit = { + if(!failed && getEnvironment.getIndexInSubtaskGroup == 0){ + failed = true + throw new Exception("Test exception.") + }else{ + super.invoke() + } + } + } + + object FailingOnceReceiver{ + var failed = false + } + + class BlockingOnceReceiver extends Receiver { + + } + + object BlockingOnceReceiver{ + var blocking = true + } + class AgnosticReceiver extends AbstractInvokable { var reader: RecordReader[IntegerRecord] = _ http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 480bc1b..9961ada 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -47,6 +47,6 @@ class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster( TaskManager.parseConfiguration(FlinkMiniCluster.HOSTNAME, configuration, true) system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, - networkConnectionConfig)), TaskManager.TASK_MANAGER_NAME + index) + networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + index) } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala index 768a5b0..5c6cca1 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -22,7 +22,7 @@ import akka.actor.ActorRef import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.{ActorLogMessages} import org.apache.flink.runtime.executiongraph.ExecutionAttemptID -import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{NotifyWhenTaskRemoved, ResponseRunningTasks, RequestRunningTasks} +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._ import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask trait TestingTaskManager extends ActorLogMessages { @@ -50,5 +50,8 @@ trait TestingTaskManager extends ActorLogMessages { case Some(actors) => for(actor <- actors) actor ! true case None => } + case RequestBroadcastVariablesWithReferences => { + sender() ! ResponseBroadcastVariablesWithReferences(bcVarManager.getNumberOfVariablesWithReferences) + } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala index 674f0d7..24d7e5c 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala @@ -21,12 +21,13 @@ package org.apache.flink.runtime.testingUtils import org.apache.flink.runtime.executiongraph.ExecutionAttemptID import org.apache.flink.runtime.taskmanager.Task -import scala.collection.convert.DecorateAsJava - -object TestingTaskManagerMessages extends DecorateAsJava{ +object TestingTaskManagerMessages{ case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID) case object RequestRunningTasks case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){ + import collection.JavaConverters._ def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava } + case object RequestBroadcastVariablesWithReferences + case class ResponseBroadcastVariablesWithReferences(number: Int) } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 126ae10..c1565c7 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -71,6 +71,7 @@ object TestingUtils { val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers) + config.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 1000) val cluster = new TestingCluster(config) cluster } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-test-utils/pom.xml ---------------------------------------------------------------------- diff --git a/flink-test-utils/pom.xml b/flink-test-utils/pom.xml index b3cbae6..d77318e 100644 --- a/flink-test-utils/pom.xml +++ b/flink-test-utils/pom.xml @@ -52,6 +52,12 @@ under the License. </dependency> <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${project.version}</version> </dependency>
