This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a5e8bb4 [hotfix][network] Remove unused IOMode from NetworkEnvironmentConfiguration a5e8bb4 is described below commit a5e8bb4efa35d2411121a7ce715bf2cd59e9cde4 Author: zhijiang <wangzhijiang...@aliyun.com> AuthorDate: Fri Mar 29 19:00:20 2019 +0800 [hotfix][network] Remove unused IOMode from NetworkEnvironmentConfiguration --- .../flink/configuration/ConfigConstants.java | 8 ++--- .../flink/runtime/io/disk/iomanager/IOManager.java | 16 ---------- .../TaskManagerServicesConfiguration.java | 15 ---------- .../NetworkEnvironmentConfiguration.java | 34 ---------------------- .../taskexecutor/NetworkBufferCalculationTest.java | 1 - 5 files changed, 4 insertions(+), 70 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 362d2f9..f1c6b85 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -284,9 +284,9 @@ public final class ConfigConstants { public static final String TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY = "taskmanager.memory.segment-size"; /** - * The implementation to use for spillable/spilled intermediate results, which have both - * synchronous and asynchronous implementations: "sync" or "async". + * @deprecated Not used anymore */ + @Deprecated public static final String TASK_MANAGER_NETWORK_DEFAULT_IO_MODE = "taskmanager.network.defaultIOMode"; /** @@ -1435,9 +1435,9 @@ public final class ConfigConstants { public static final int DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE = 32768; /** - * The implementation to use for spillable/spilled intermediate results, which have both - * synchronous and asynchronous implementations: "sync" or "async". + * @deprecated Not used anymore */ + @Deprecated public static final String DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE = "sync"; /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java index 6c84d7f..0aaadf0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java @@ -36,22 +36,6 @@ import java.util.concurrent.LinkedBlockingQueue; * The facade for the provided I/O manager services. */ public abstract class IOManager { - - public enum IOMode { - - SYNC(true), ASYNC(false); - - private final boolean isSynchronous; - - IOMode(boolean isSynchronous) { - this.isSynchronous = isSynchronous; - } - - public boolean isSynchronous() { - return isSynchronous; - } - } - /** Logging */ protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index 124d46b..e791a92 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.IllegalConfigurationException; @@ -29,7 +28,6 @@ import org.apache.flink.configuration.QueryableStateOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration; @@ -379,18 +377,6 @@ public class TaskManagerServicesConfiguration { nettyConfig = null; } - // Default spill I/O mode for intermediate results - final String syncOrAsync = configuration.getString( - ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE, - ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE); - - final IOManager.IOMode ioMode; - if (syncOrAsync.equals("async")) { - ioMode = IOManager.IOMode.ASYNC; - } else { - ioMode = IOManager.IOMode.SYNC; - } - int initialRequestBackoff = configuration.getInteger( TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); int maxRequestBackoff = configuration.getInteger( @@ -406,7 +392,6 @@ public class TaskManagerServicesConfiguration { networkBufMin, networkBufMax, pageSize, - ioMode, initialRequestBackoff, maxRequestBackoff, buffersPerChannel, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java index 6c66c77..fec5178 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.taskmanager; -import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; import org.apache.flink.runtime.io.network.netty.NettyConfig; import javax.annotation.Nullable; @@ -36,8 +35,6 @@ public class NetworkEnvironmentConfiguration { private final int networkBufferSize; - private final IOMode ioMode; - private final int partitionRequestInitialBackoff; private final int partitionRequestMaxBackoff; @@ -48,34 +45,11 @@ public class NetworkEnvironmentConfiguration { private final NettyConfig nettyConfig; - /** - * Constructor for a setup with purely local communication (no netty). - */ public NetworkEnvironmentConfiguration( float networkBufFraction, long networkBufMin, long networkBufMax, int networkBufferSize, - IOMode ioMode, - int partitionRequestInitialBackoff, - int partitionRequestMaxBackoff, - int networkBuffersPerChannel, - int floatingNetworkBuffersPerGate) { - - this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize, - ioMode, - partitionRequestInitialBackoff, partitionRequestMaxBackoff, - networkBuffersPerChannel, floatingNetworkBuffersPerGate, - null); - - } - - public NetworkEnvironmentConfiguration( - float networkBufFraction, - long networkBufMin, - long networkBufMax, - int networkBufferSize, - IOMode ioMode, int partitionRequestInitialBackoff, int partitionRequestMaxBackoff, int networkBuffersPerChannel, @@ -86,7 +60,6 @@ public class NetworkEnvironmentConfiguration { this.networkBufMin = networkBufMin; this.networkBufMax = networkBufMax; this.networkBufferSize = networkBufferSize; - this.ioMode = ioMode; this.partitionRequestInitialBackoff = partitionRequestInitialBackoff; this.partitionRequestMaxBackoff = partitionRequestMaxBackoff; this.networkBuffersPerChannel = networkBuffersPerChannel; @@ -112,10 +85,6 @@ public class NetworkEnvironmentConfiguration { return networkBufferSize; } - public IOMode ioMode() { - return ioMode; - } - public int partitionRequestInitialBackoff() { return partitionRequestInitialBackoff; } @@ -142,7 +111,6 @@ public class NetworkEnvironmentConfiguration { public int hashCode() { int result = 1; result = 31 * result + networkBufferSize; - result = 31 * result + ioMode.hashCode(); result = 31 * result + partitionRequestInitialBackoff; result = 31 * result + partitionRequestMaxBackoff; result = 31 * result + networkBuffersPerChannel; @@ -170,7 +138,6 @@ public class NetworkEnvironmentConfiguration { this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff && this.networkBuffersPerChannel == that.networkBuffersPerChannel && this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate && - this.ioMode == that.ioMode && (nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null); } } @@ -182,7 +149,6 @@ public class NetworkEnvironmentConfiguration { ", networkBufMin=" + networkBufMin + ", networkBufMax=" + networkBufMax + ", networkBufferSize=" + networkBufferSize + - ", ioMode=" + ioMode + ", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff + ", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff + ", networkBuffersPerChannel=" + networkBuffersPerChannel + diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java index c164f65..7aa43ef 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java @@ -91,7 +91,6 @@ public class NetworkBufferCalculationTest extends TestLogger { networkBufMin, networkBufMax, checkedDownCast(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes()), - null, TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(), TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(), TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),