This is an automated email from the ASF dual-hosted git repository. aajisaka pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 663ca14 MAPREDUCE-7329: HadoopPipes task may fail when linux kernel version change from 3.x to 4.x (#2775) 663ca14 is described below commit 663ca14a769bd8fa124c1aff4ac6630491dbb425 Author: lichaojacobs <lichaojac...@tju.edu.cn> AuthorDate: Fri Apr 9 10:58:53 2021 +0800 MAPREDUCE-7329: HadoopPipes task may fail when linux kernel version change from 3.x to 4.x (#2775) --- .../apache/hadoop/mapred/pipes/Application.java | 52 ++++++++++++++ .../hadoop/mapred/pipes/TestPipeApplication.java | 83 ++++++++++++++++++++++ 2 files changed, 135 insertions(+) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java index 83d2509..5416d26 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java @@ -30,12 +30,14 @@ import java.util.Random; import javax.crypto.SecretKey; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -52,6 +54,7 @@ import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -66,6 +69,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable, private static final Logger LOG = LoggerFactory.getLogger(Application.class.getName()); private ServerSocket serverSocket; + private PingSocketCleaner socketCleaner; private Process process; private Socket clientSocket; private OutputHandler<K2, V2> handler; @@ -133,6 +137,13 @@ class Application<K1 extends WritableComparable, V1 extends Writable, process = runClient(cmd, env); clientSocket = serverSocket.accept(); + // start ping socket cleaner + int soTimeout = conf.getInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, + CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT); + socketCleaner = new PingSocketCleaner("ping-socket-cleaner", serverSocket, + soTimeout); + socketCleaner.setDaemon(true); + socketCleaner.start(); String challenge = getSecurityChallenge(); String digestToSend = createDigest(password, challenge); @@ -237,6 +248,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable, serverSocket.close(); try { downlink.close(); + socketCleaner.interrupt(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } @@ -266,4 +278,44 @@ class Application<K1 extends WritableComparable, V1 extends Writable, return SecureShuffleUtils.hashFromString(data, key); } + @VisibleForTesting + public static class PingSocketCleaner extends Thread { + private final ServerSocket serverSocket; + private final int soTimeout; + + PingSocketCleaner(String name, ServerSocket serverSocket, int soTimeout) { + super(name); + this.serverSocket = serverSocket; + this.soTimeout = soTimeout; + } + + @Override + public void run() { + LOG.info("PingSocketCleaner started..."); + while (!Thread.currentThread().isInterrupted()) { + Socket clientSocket = null; + try { + clientSocket = serverSocket.accept(); + clientSocket.setSoTimeout(soTimeout); + LOG.debug("Connection received from {}", + clientSocket.getInetAddress()); + int readData = 0; + while (readData != -1) { + readData = clientSocket.getInputStream().read(); + } + LOG.debug("close socket cause client has closed."); + closeSocketInternal(clientSocket); + } catch (IOException exception) { + LOG.error("PingSocketCleaner exception", exception); + } finally { + closeSocketInternal(clientSocket); + } + } + } + + @VisibleForTesting + protected void closeSocketInternal(Socket clientSocket) { + IOUtils.closeSocket(clientSocket); + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java index f0b383a..79c0bc1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java @@ -28,12 +28,15 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintStream; +import java.net.ServerSocket; +import java.net.Socket; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FsConstants; @@ -59,7 +62,9 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapred.TaskLog; +import org.apache.hadoop.mapred.pipes.Application.PingSocketCleaner; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -455,6 +460,84 @@ public class TestPipeApplication { assertEquals(3, partitioner.getPartition(iw, new Text("test"), 2)); } + @Test + public void testSocketCleaner() throws Exception { + ServerSocket serverSocket = setupServerSocket(); + SocketCleaner cleaner = setupCleaner(serverSocket); + // mock ping thread, connect to server socket per second. + int expectedClosedCount = 5; + for (int i = 0; i < expectedClosedCount; i++) { + try { + Thread.sleep(1000); + Socket clientSocket = new Socket(serverSocket.getInetAddress(), + serverSocket.getLocalPort()); + clientSocket.close(); + } catch (Exception exception) { + // ignored... + exception.printStackTrace(); + } + } + GenericTestUtils.waitFor( + () -> expectedClosedCount == cleaner.getCloseSocketCount(), 100, 5000); + } + + @Test + public void testSocketTimeout() throws Exception { + ServerSocket serverSocket = setupServerSocket(); + SocketCleaner cleaner = setupCleaner(serverSocket, 100); + try { + new Socket(serverSocket.getInetAddress(), serverSocket.getLocalPort()); + Thread.sleep(1000); + } catch (Exception exception) { + // ignored... + } + GenericTestUtils.waitFor(() -> 1 == cleaner.getCloseSocketCount(), 100, + 5000); + } + + private SocketCleaner setupCleaner(ServerSocket serverSocket) { + return setupCleaner(serverSocket, + CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT); + } + + private SocketCleaner setupCleaner(ServerSocket serverSocket, int soTimeout) { + // start socket cleaner. + SocketCleaner cleaner = new SocketCleaner("test-ping-socket-cleaner", + serverSocket, soTimeout); + cleaner.setDaemon(true); + cleaner.start(); + + return cleaner; + } + + private static class SocketCleaner extends PingSocketCleaner { + private int closeSocketCount = 0; + + SocketCleaner(String name, ServerSocket serverSocket, int soTimeout) { + super(name, serverSocket, soTimeout); + } + + @Override + public void run() { + super.run(); + } + + protected void closeSocketInternal(Socket clientSocket) { + if (!clientSocket.isClosed()) { + closeSocketCount++; + } + super.closeSocketInternal(clientSocket); + } + + public int getCloseSocketCount() { + return closeSocketCount; + } + } + + private ServerSocket setupServerSocket() throws Exception { + return new ServerSocket(0, 1); + } + /** * clean previous std error and outs */ --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org