Repository: flink Updated Branches: refs/heads/master bc9d52391 -> 78f2a1586
http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index 4597e3b..9f39de1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -19,39 +19,34 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.DummyActorGateway; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.taskmanager.ActorGatewayResultPartitionConsumableNotifier; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.util.NetUtils; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import scala.Some; -import scala.Tuple2; import scala.concurrent.duration.FiniteDuration; import scala.concurrent.impl.Promise; -import java.net.InetAddress; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; @@ -61,100 +56,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class NetworkEnvironmentTest { - - @Test - public void testAssociateDisassociate() { - final int BUFFER_SIZE = 1024; - final int NUM_BUFFERS = 20; - - final int port; - try { - port = NetUtils.getAvailablePort(); - } - catch (Throwable t) { - // ignore - return; - } - - try { - NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, 1, new Configuration()); - NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration( - NUM_BUFFERS, BUFFER_SIZE, MemoryType.HEAP, - IOManager.IOMode.SYNC, 0, 0, 0, new Some<>(nettyConf), - new Tuple2<>(0, 0)); - - NetworkEnvironment env = new NetworkEnvironment( - TestingUtils.defaultExecutionContext(), - new FiniteDuration(30, TimeUnit.SECONDS), - config, - new InstanceConnectionInfo(InetAddress.getLocalHost(), port)); - - assertFalse(env.isShutdown()); - assertFalse(env.isAssociated()); - - // pool must be started already - assertNotNull(env.getNetworkBufferPool()); - assertEquals(NUM_BUFFERS, env.getNetworkBufferPool().getTotalNumberOfMemorySegments()); - - // others components are still shut down - assertNull(env.getConnectionManager()); - assertNull(env.getPartitionConsumableNotifier()); - assertNull(env.getTaskEventDispatcher()); - assertNull(env.getPartitionManager()); - - // associate the environment with some mock actors - env.associateWithTaskManagerAndJobManager( - DummyActorGateway.INSTANCE, - DummyActorGateway.INSTANCE); - - assertNotNull(env.getConnectionManager()); - assertNotNull(env.getPartitionConsumableNotifier()); - assertNotNull(env.getTaskEventDispatcher()); - assertNotNull(env.getPartitionManager()); - - // allocate some buffer pool - BufferPool localPool = env.getNetworkBufferPool().createBufferPool(10, false); - assertNotNull(localPool); - - // disassociate - env.disassociate(); - - assertNull(env.getConnectionManager()); - assertNull(env.getPartitionConsumableNotifier()); - assertNull(env.getTaskEventDispatcher()); - assertNull(env.getPartitionManager()); - - assertNotNull(env.getNetworkBufferPool()); - assertTrue(localPool.isDestroyed()); - - // associate once again - env.associateWithTaskManagerAndJobManager( - DummyActorGateway.INSTANCE, - DummyActorGateway.INSTANCE - ); - - assertNotNull(env.getConnectionManager()); - assertNotNull(env.getPartitionConsumableNotifier()); - assertNotNull(env.getTaskEventDispatcher()); - assertNotNull(env.getPartitionManager()); - - // shutdown for good - env.shutdown(); - - assertTrue(env.isShutdown()); - assertFalse(env.isAssociated()); - assertNull(env.getConnectionManager()); - assertNull(env.getPartitionConsumableNotifier()); - assertNull(env.getTaskEventDispatcher()); - assertNull(env.getPartitionManager()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - /** * Registers a task with an eager and non-eager partition at the network * environment and verifies that there is exactly on schedule or update @@ -164,45 +65,61 @@ public class NetworkEnvironmentTest { @SuppressWarnings("unchecked") public void testEagerlyDeployConsumers() throws Exception { // Mock job manager => expected interactions will be verified - ActorGateway jobManager = mock(ActorGateway.class); + final ActorGateway jobManager = mock(ActorGateway.class); when(jobManager.ask(anyObject(), any(FiniteDuration.class))) .thenReturn(new Promise.DefaultPromise<>().future()); // Network environment setup NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration( - 20, - 1024, - MemoryType.HEAP, - IOManager.IOMode.SYNC, - 0, - 0, - 0, - Some.<NettyConfig>empty(), - new Tuple2<>(0, 0)); + 20, + 1024, + MemoryType.HEAP, + IOManager.IOMode.SYNC, + 0, + 0, + 0, + Some.<NettyConfig>empty(), + 0, + 0); NetworkEnvironment env = new NetworkEnvironment( - TestingUtils.defaultExecutionContext(), - new FiniteDuration(30, TimeUnit.SECONDS), - config, - new InstanceConnectionInfo(InetAddress.getLocalHost(), 12232)); - - // Associate the environment with the mock actors - env.associateWithTaskManagerAndJobManager( - jobManager, - DummyActorGateway.INSTANCE); + new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize(), config.memoryType()), + new LocalConnectionManager(), + new ResultPartitionManager(), + new TaskEventDispatcher(), + new KvStateRegistry(), + null, + config.ioMode(), + config.partitionRequestInitialBackoff(), + config.partitinRequestMaxBackoff()); + + env.start(); + + JobManagerCommunicationFactory jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class); + + when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenAnswer(new Answer<ResultPartitionConsumableNotifier>() { + @Override + public ResultPartitionConsumableNotifier answer(InvocationOnMock invocation) throws Throwable { + return new ActorGatewayResultPartitionConsumableNotifier( + TestingUtils.defaultExecutionContext(), + jobManager, + (Task)invocation.getArguments()[0], + new FiniteDuration(30, TimeUnit.SECONDS)); + } + }); // Register mock task JobID jobId = new JobID(); + Task mockTask = mock(Task.class); ResultPartition[] partitions = new ResultPartition[2]; - partitions[0] = createPartition("p1", jobId, true, env); - partitions[1] = createPartition("p2", jobId, false, env); + partitions[0] = createPartition(mockTask, "p1", jobId, true, env, jobManagerCommunicationFactory); + partitions[1] = createPartition(mockTask, "p2", jobId, false, env, jobManagerCommunicationFactory); ResultPartitionWriter[] writers = new ResultPartitionWriter[2]; writers[0] = new ResultPartitionWriter(partitions[0]); writers[1] = new ResultPartitionWriter(partitions[1]); - Task mockTask = mock(Task.class); when(mockTask.getAllInputGates()).thenReturn(new SingleInputGate[0]); when(mockTask.getAllWriters()).thenReturn(writers); when(mockTask.getProducedPartitions()).thenReturn(partitions); @@ -221,10 +138,12 @@ public class NetworkEnvironmentTest { * Helper to create a mock result partition. */ private static ResultPartition createPartition( - String name, - JobID jobId, - boolean eagerlyDeployConsumers, - NetworkEnvironment env) { + Task owningTask, + String name, + JobID jobId, + boolean eagerlyDeployConsumers, + NetworkEnvironment env, + JobManagerCommunicationFactory jobManagerCommunicationFactory) { return new ResultPartition( name, @@ -233,8 +152,8 @@ public class NetworkEnvironmentTest { ResultPartitionType.PIPELINED, eagerlyDeployConsumers, 1, - env.getPartitionManager(), - env.getPartitionConsumableNotifier(), + env.getResultPartitionManager(), + jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(owningTask), mock(IOManager.class), env.getDefaultIOMode()); } http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java index 0868398..8884b29 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java @@ -23,7 +23,6 @@ import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.junit.Test; -import scala.Tuple2; import java.io.IOException; @@ -103,10 +102,11 @@ public class InputChannelTest { private InputChannel createInputChannel(int initialBackoff, int maxBackoff) { return new MockInputChannel( - mock(SingleInputGate.class), - 0, - new ResultPartitionID(), - new Tuple2<Integer, Integer>(initialBackoff, maxBackoff)); + mock(SingleInputGate.class), + 0, + new ResultPartitionID(), + initialBackoff, + maxBackoff); } // --------------------------------------------------------------------------------------------- @@ -117,9 +117,10 @@ public class InputChannelTest { SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, - Tuple2<Integer, Integer> initialAndMaxBackoff) { + int initialBackoff, + int maxBackoff) { - super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new SimpleCounter()); + super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, new SimpleCounter()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index f91a4ba..18d9073 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -267,13 +267,14 @@ public class LocalInputChannelTest { throws IOException, InterruptedException { return new LocalInputChannel( - inputGate, - 0, - new ResultPartitionID(), - partitionManager, - mock(TaskEventDispatcher.class), - initialAndMaxRequestBackoff, - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + inputGate, + 0, + new ResultPartitionID(), + partitionManager, + mock(TaskEventDispatcher.class), + initialAndMaxRequestBackoff._1(), + initialAndMaxRequestBackoff._2(), + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 9eb49ef..9a79ff8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -299,12 +299,13 @@ public class RemoteInputChannelTest { .thenReturn(partitionRequestClient); return new RemoteInputChannel( - inputGate, - 0, - new ResultPartitionID(), - mock(ConnectionID.class), - connectionManager, - initialAndMaxRequestBackoff, - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + inputGate, + 0, + new ResultPartitionID(), + mock(ConnectionID.class), + connectionManager, + initialAndMaxRequestBackoff._1(), + initialAndMaxRequestBackoff._2(), + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 05427a1..f55fee5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -128,7 +128,7 @@ public class SingleInputGateTest { // Unknown ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()); - InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), new Tuple2<Integer, Integer>(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); // Set channels inputGate.setInputChannel(localPartitionId.getPartitionId(), local); @@ -174,13 +174,15 @@ public class SingleInputGateTest { ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); InputChannel unknown = new UnknownInputChannel( - inputGate, - 0, - new ResultPartitionID(), - partitionManager, - new TaskEventDispatcher(), - new LocalConnectionManager(), - new Tuple2<Integer, Integer>(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + inputGate, + 0, + new ResultPartitionID(), + partitionManager, + new TaskEventDispatcher(), + new LocalConnectionManager(), + 0, + 0, + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); @@ -213,14 +215,15 @@ public class SingleInputGateTest { new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); InputChannel unknown = new UnknownInputChannel( - inputGate, - 0, - new ResultPartitionID(), - new ResultPartitionManager(), - new TaskEventDispatcher(), - new LocalConnectionManager(), - new Tuple2<Integer, Integer>(0, 0), - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + inputGate, + 0, + new ResultPartitionID(), + new ResultPartitionManager(), + new TaskEventDispatcher(), + new LocalConnectionManager(), + 0, + 0, + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index ab4ca3b..9501c7c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -150,12 +150,14 @@ public class TaskAsyncCallTest { ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class); NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class); - when(networkEnvironment.getPartitionManager()).thenReturn(partitionManager); - when(networkEnvironment.getPartitionConsumableNotifier()).thenReturn(consumableNotifier); + when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager); when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC); when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) .thenReturn(mock(TaskKvStateRegistry.class)); + JobManagerCommunicationFactory jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class); + when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier); + TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(), new SerializedValue<>(new ExecutionConfig()), @@ -170,17 +172,18 @@ public class TaskAsyncCallTest { ActorGateway taskManagerGateway = DummyActorGateway.INSTANCE; return new Task(tdd, - mock(MemoryManager.class), - mock(IOManager.class), - networkEnvironment, - mock(BroadcastVariableManager.class), - taskManagerGateway, - DummyActorGateway.INSTANCE, - new FiniteDuration(60, TimeUnit.SECONDS), - libCache, - mock(FileCache.class), - new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), - mock(TaskMetricGroup.class)); + mock(MemoryManager.class), + mock(IOManager.class), + networkEnvironment, + jobManagerCommunicationFactory, + mock(BroadcastVariableManager.class), + taskManagerGateway, + DummyActorGateway.INSTANCE, + new FiniteDuration(60, TimeUnit.SECONDS), + libCache, + mock(FileCache.class), + new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), + mock(TaskMetricGroup.class)); } public static class CheckpointsInOrderInvokable extends AbstractInvokable implements StatefulTask { http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 147a3e0..3371c49 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -35,8 +35,12 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.LocalConnectionManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.NettyConfig; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -44,11 +48,11 @@ import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.TaskManagerMessages; +import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.junit.Test; import scala.Option; -import scala.Tuple2; import scala.concurrent.duration.FiniteDuration; import java.net.InetAddress; @@ -99,32 +103,40 @@ public class TaskManagerComponentsStartupShutdownTest { final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration( 32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, 0, - Option.<NettyConfig>empty(), new Tuple2<Integer, Integer>(0, 0)); + Option.<NettyConfig>empty(), 0, 0); final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(), 10000); final MemoryManager memManager = new MemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, false); final IOManager ioManager = new IOManagerAsync(TMP_DIR); final NetworkEnvironment network = new NetworkEnvironment( - TestingUtils.defaultExecutionContext(), - timeout, - netConf, - connectionInfo); + new NetworkBufferPool(netConf.numNetworkBuffers(), netConf.networkBufferSize(), netConf.memoryType()), + new LocalConnectionManager(), + new ResultPartitionManager(), + new TaskEventDispatcher(), + new KvStateRegistry(), + null, + netConf.ioMode(), + netConf.partitionRequestInitialBackoff(), + netConf.partitinRequestMaxBackoff()); + + network.start(); + final int numberOfSlots = 1; LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManager.path().toString()); // create the task manager final Props tmProps = Props.create( - TaskManager.class, - tmConfig, - ResourceID.generate(), - connectionInfo, - memManager, - ioManager, - network, - numberOfSlots, - leaderRetrievalService); + TaskManager.class, + tmConfig, + ResourceID.generate(), + connectionInfo, + memManager, + ioManager, + network, + numberOfSlots, + leaderRetrievalService); final ActorRef taskManager = actorSystem.actorOf(tmProps); @@ -142,9 +154,6 @@ public class TaskManagerComponentsStartupShutdownTest { }; }}; - // the components should now all be initialized - assertTrue(network.isAssociated()); - // shut down all actors and the actor system // Kill the Task down the JobManager taskManager.tell(Kill.getInstance(), ActorRef.noSender()); @@ -156,7 +165,6 @@ public class TaskManagerComponentsStartupShutdownTest { actorSystem = null; // now that the TaskManager is shut down, the components should be shut down as well - assertFalse(network.isAssociated()); assertTrue(network.isShutdown()); assertTrue(ioManager.isProperlyShutDown()); assertTrue(memManager.isShutdown()); http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index e23aba7..53fa7c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -28,12 +28,10 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; -import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; @@ -47,15 +45,11 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import scala.Option; -import scala.Some; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; -import java.io.IOException; -import java.net.InetAddress; -import java.net.ServerSocket; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -548,69 +542,6 @@ public class TaskManagerRegistrationTest extends TestLogger { }}; } - - @Test - public void testStartupWhenNetworkStackFailsToInitialize() { - - ServerSocket blocker = null; - - try { - blocker = new ServerSocket(0, 50, InetAddress.getByName("localhost")); - - final Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost"); - cfg.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, blocker.getLocalPort()); - cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1); - - new JavaTestKit(actorSystem) {{ - ActorRef taskManager = null; - ActorRef jobManager = null; - ActorRef resourceManager = null; - - try { - // a simple JobManager - jobManager = startJobManager(config); - - resourceManager = startResourceManager(config, jobManager); - - // start a task manager with a configuration that provides a blocked port - taskManager = TaskManager.startTaskManagerComponentsAndActor( - cfg, ResourceID.generate(), actorSystem, "localhost", - NONE_STRING, // no actor name -> random - new Some<LeaderRetrievalService>(new StandaloneLeaderRetrievalService(jobManager.path().toString())), - false, // init network stack !!! - TaskManager.class); - - watch(taskManager); - - expectTerminated(timeout, taskManager); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { - stopActor(taskManager); - stopActor(jobManager); - } - }}; - } - catch (Exception e) { - // does not work, skip test - e.printStackTrace(); - fail(e.getMessage()); - } - finally { - if (blocker != null) { - try { - blocker.close(); - } - catch (IOException e) { - // ignore, best effort - } - } - } - } - @Test public void testCheckForValidRegistrationSessionIDs() { new JavaTestKit(actorSystem) {{ http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java index 149df6e..686de76 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java @@ -25,9 +25,10 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.util.StartupUtils; -import org.junit.Before; import org.junit.Test; +import scala.Option; import java.io.File; import java.io.IOException; @@ -179,4 +180,43 @@ public class TaskManagerStartupTest { fail(e.getMessage()); } } + + /** + * Tests that the task manager start-up fails if the network stack cannot be initialized. + * @throws Exception + */ + @Test(expected = IOException.class) + public void testStartupWhenNetworkStackFailsToInitialize() throws Exception { + + ServerSocket blocker = null; + + try { + blocker = new ServerSocket(0, 50, InetAddress.getByName("localhost")); + + final Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost"); + cfg.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, blocker.getLocalPort()); + cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1); + + TaskManager.startTaskManagerComponentsAndActor( + cfg, + ResourceID.generate(), + null, + "localhost", + Option.<String>empty(), + Option.<LeaderRetrievalService>empty(), + false, + TaskManager.class); + } + finally { + if (blocker != null) { + try { + blocker.close(); + } + catch (IOException e) { + // ignore, best effort + } + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 54cd7c6..0e53673 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -990,11 +990,8 @@ public class TaskManagerTest extends TestLogger { jobManager = new AkkaActorGateway(jm, leaderSessionID); - final int dataPort = NetUtils.getAvailablePort(); final Configuration config = new Configuration(); - config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); - taskManager = TestingUtils.createTaskManager( system, jobManager, http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java index fea21be..cfa7fb6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java @@ -66,10 +66,20 @@ public class TaskStopTest { when(tddMock.getSerializedExecutionConfig()).thenReturn(mock(SerializedValue.class)); when(tddMock.getInvokableClassName()).thenReturn("className"); - task = new Task(tddMock, mock(MemoryManager.class), mock(IOManager.class), mock(NetworkEnvironment.class), - mock(BroadcastVariableManager.class), mock(ActorGateway.class), mock(ActorGateway.class), - mock(FiniteDuration.class), mock(LibraryCacheManager.class), mock(FileCache.class), - mock(TaskManagerRuntimeInfo.class), mock(TaskMetricGroup.class)); + task = new Task( + tddMock, + mock(MemoryManager.class), + mock(IOManager.class), + mock(NetworkEnvironment.class), + mock(JobManagerCommunicationFactory.class), + mock(BroadcastVariableManager.class), + mock(ActorGateway.class), + mock(ActorGateway.class), + mock(FiniteDuration.class), + mock(LibraryCacheManager.class), + mock(FileCache.class), + mock(TaskManagerRuntimeInfo.class), + mock(TaskMetricGroup.class)); Field f = task.getClass().getDeclaredField("invokable"); f.setAccessible(true); f.set(task, taskMock); http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index f145b48..9e8f8f8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -244,12 +244,14 @@ public class TaskTest { ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class); NetworkEnvironment network = mock(NetworkEnvironment.class); - when(network.getPartitionManager()).thenReturn(partitionManager); - when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier); + when(network.getResultPartitionManager()).thenReturn(partitionManager); when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC); doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class)); + + JobManagerCommunicationFactory jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class); + when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier); - Task task = createTask(TestInvokableCorrect.class, libCache, network); + Task task = createTask(TestInvokableCorrect.class, libCache, network, jobManagerCommunicationFactory); task.registerExecutionListener(listenerGateway); @@ -598,18 +600,22 @@ public class TaskTest { ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class); NetworkEnvironment network = mock(NetworkEnvironment.class); - when(network.getPartitionManager()).thenReturn(partitionManager); - when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier); + JobManagerCommunicationFactory jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class); + when(network.getResultPartitionManager()).thenReturn(partitionManager); when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC); when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) .thenReturn(mock(TaskKvStateRegistry.class)); + + when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier); - return createTask(invokable, libCache, network); + return createTask(invokable, libCache, network, jobManagerCommunicationFactory); } - private Task createTask(Class<? extends AbstractInvokable> invokable, - LibraryCacheManager libCache, - NetworkEnvironment networkEnvironment) { + private Task createTask( + Class<? extends AbstractInvokable> invokable, + LibraryCacheManager libCache, + NetworkEnvironment networkEnvironment, + JobManagerCommunicationFactory jobManagerCommunicationFactory) { TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(invokable); @@ -618,6 +624,7 @@ public class TaskTest { mock(MemoryManager.class), mock(IOManager.class), networkEnvironment, + jobManagerCommunicationFactory, mock(BroadcastVariableManager.class), taskManagerGateway, jobManagerGateway, http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index f8b4063..e1c9407 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -41,13 +41,12 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractCloseableHandle; import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory; import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -148,19 +147,20 @@ public class InterruptSensitiveRestoreTest { .thenReturn(mock(TaskKvStateRegistry.class)); return new Task( - tdd, - mock(MemoryManager.class), - mock(IOManager.class), - networkEnvironment, - mock(BroadcastVariableManager.class), - mock(ActorGateway.class), - mock(ActorGateway.class), - new FiniteDuration(10, TimeUnit.SECONDS), - new FallbackLibraryCacheManager(), - new FileCache(new Configuration()), - new TaskManagerRuntimeInfo( - "localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()), - new UnregisteredTaskMetricsGroup()); + tdd, + mock(MemoryManager.class), + mock(IOManager.class), + networkEnvironment, + mock(JobManagerCommunicationFactory.class), + mock(BroadcastVariableManager.class), + mock(ActorGateway.class), + mock(ActorGateway.class), + new FiniteDuration(10, TimeUnit.SECONDS), + new FallbackLibraryCacheManager(), + new FileCache(new Configuration()), + new TaskManagerRuntimeInfo( + "localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()), + new UnregisteredTaskMetricsGroup()); } http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 408b5b1..0a9d2fa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.messages.TaskMessages; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -81,9 +82,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class StreamTaskTest { @@ -228,12 +226,14 @@ public class StreamTaskTest { ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class); NetworkEnvironment network = mock(NetworkEnvironment.class); - when(network.getPartitionManager()).thenReturn(partitionManager); - when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier); + when(network.getResultPartitionManager()).thenReturn(partitionManager); when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC); when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) .thenReturn(mock(TaskKvStateRegistry.class)); + JobManagerCommunicationFactory jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class); + when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier); + TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(), new SerializedValue<>(new ExecutionConfig()), @@ -248,18 +248,19 @@ public class StreamTaskTest { 0); return new Task( - tdd, - mock(MemoryManager.class), - mock(IOManager.class), - network, - mock(BroadcastVariableManager.class), - new DummyGateway(), - new DummyGateway(), - new FiniteDuration(60, TimeUnit.SECONDS), - libCache, - mock(FileCache.class), - new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), - mock(TaskMetricGroup.class)); + tdd, + mock(MemoryManager.class), + mock(IOManager.class), + network, + jobManagerCommunicationFactory, + mock(BroadcastVariableManager.class), + new DummyGateway(), + new DummyGateway(), + new FiniteDuration(60, TimeUnit.SECONDS), + libCache, + mock(FileCache.class), + new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), + mock(TaskMetricGroup.class)); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 82dbd1f..8915bff 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -233,13 +233,19 @@ public class RescalingITCase extends TestLogger { cluster.submitJobDetached(jobGraph); - Future<Object> allTasksRunning = jobManager.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobID), deadline.timeLeft()); + Object savepointResponse = null; - Await.ready(allTasksRunning, deadline.timeLeft()); + // we might be too early for taking a savepoint if the operators have not been started yet + while (deadline.hasTimeLeft()) { - Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID), deadline.timeLeft()); + Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID), deadline.timeLeft()); + + savepointResponse = Await.result(savepointPathFuture, deadline.timeLeft()); - Object savepointResponse = Await.result(savepointPathFuture, deadline.timeLeft()); + if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) { + break; + } + } assertTrue(savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess); http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala index 8b39f52..107801d 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala @@ -18,17 +18,13 @@ package org.apache.flink.yarn -import org.apache.flink.runtime.clusterframework.messages.StopCluster import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.instance.InstanceConnectionInfo import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager -import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, TaskManager} -import org.apache.flink.runtime.util.ProcessShutDownThread - -import scala.concurrent.duration._ +import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration} /** An extension of the TaskManager that listens for additional YARN related * messages.