[ https://issues.apache.org/jira/browse/HDFS-15413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793170#comment-17793170 ]
ASF GitHub Bot commented on HDFS-15413: --------------------------------------- Neilxzn commented on code in PR #5829: URL: https://github.com/apache/hadoop/pull/5829#discussion_r1415029913 ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java: ########## @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.ErasureCodeNative; +import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.util.Arrays; + +public class TestDFSStripedInputStreamWithTimeout { + + public static final Logger LOG = + LoggerFactory.getLogger(TestDFSStripedInputStreamWithTimeout.class); + + private MiniDFSCluster cluster; + private Configuration conf = new Configuration(); + private DistributedFileSystem fs; + private final Path dirPath = new Path("/striped"); + private Path filePath = new Path(dirPath, "file"); + private ErasureCodingPolicy ecPolicy; + private short dataBlocks; + private short parityBlocks; + private int cellSize; + private final int stripesPerBlock = 2; + private int blockSize; + private int blockGroupSize; + + @Rule + public Timeout globalTimeout = new Timeout(300000); + + public ErasureCodingPolicy getEcPolicy() { + return StripedFileTestUtil.getDefaultECPolicy(); + } + + @Before + public void setup() throws IOException { + /* + * Initialize erasure coding policy. + */ + ecPolicy = getEcPolicy(); + dataBlocks = (short) ecPolicy.getNumDataUnits(); + parityBlocks = (short) ecPolicy.getNumParityUnits(); + cellSize = ecPolicy.getCellSize(); + blockSize = stripesPerBlock * cellSize; + blockGroupSize = dataBlocks * blockSize; + System.out.println("EC policy = " + ecPolicy); + + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); + + conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 1000); + // SET CONFIG FOR HDFS CLIENT + conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000); + conf.setInt(HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS, 3); + + if (ErasureCodeNative.isNativeCodeLoaded()) { + conf.set( + CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY, + NativeRSRawErasureCoderFactory.CODER_NAME); + } + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, + GenericTestUtils.getRandomizedTempPath()); + SimulatedFSDataset.setFactory(conf); + startUp(); + } + + private void startUp() throws IOException { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes( + dataBlocks + parityBlocks).build(); + cluster.waitActive(); + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + } + fs = cluster.getFileSystem(); + fs.enableErasureCodingPolicy(getEcPolicy().getName()); + fs.mkdirs(dirPath); + fs.getClient() + .setErasureCodingPolicy(dirPath.toString(), ecPolicy.getName()); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void testPreadTimeout() throws Exception { + final int numBlocks = 2; + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + stripesPerBlock, false, ecPolicy); + final int fileSize = numBlocks * blockGroupSize; + + LocatedBlocks lbs = fs.getClient().namenode. + getBlockLocations(filePath.toString(), 0, fileSize); + + for (LocatedBlock lb : lbs.getLocatedBlocks()) { + assert lb instanceof LocatedStripedBlock; + LocatedStripedBlock bg = (LocatedStripedBlock) (lb); + for (int i = 0; i < dataBlocks; i++) { + Block blk = new Block(bg.getBlock().getBlockId() + i, + stripesPerBlock * cellSize, + bg.getBlock().getGenerationStamp()); + blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); + cluster.injectBlocks(i, Arrays.asList(blk), + bg.getBlock().getBlockPoolId()); + } + } + + DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(), + filePath.toString(), false, ecPolicy, null); + int bufLen = 1024 * 100; + byte[] buf = new byte[bufLen]; + int readTotal = 0; + try { + while (readTotal < fileSize) { + in.seek(readTotal); + int nread = in.read(buf, 0, bufLen); + readTotal += nread; + // Simulated time-consuming processing operations, such as UDF. + Thread.sleep(10); + } + Assert.assertEquals("Success to read striped file.", fileSize, readTotal); + } catch (Exception e) { + Assert.fail("Fail to read striped time out. "); + } + in.close(); Review Comment: fix it and add testReadFileWithAttempt for unit test. Please review it again. Thank you! > DFSStripedInputStream throws exception when datanodes close idle connections > ---------------------------------------------------------------------------- > > Key: HDFS-15413 > URL: https://issues.apache.org/jira/browse/HDFS-15413 > Project: Hadoop HDFS > Issue Type: Bug > Components: ec, erasure-coding, hdfs-client > Affects Versions: 3.1.3 > Environment: - Hadoop 3.1.3 > - erasure coding with ISA-L and RS-3-2-1024k scheme > - running in kubernetes > - dfs.client.socket-timeout = 10000 > - dfs.datanode.socket.write.timeout = 10000 > Reporter: Andrey Elenskiy > Priority: Critical > Labels: pull-request-available > Attachments: out.log > > > We've run into an issue with compactions failing in HBase when erasure coding > is enabled on a table directory. After digging further I was able to narrow > it down to a seek + read logic and able to reproduce the issue with hdfs > client only: > {code:java} > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.FSDataInputStream; > public class ReaderRaw { > public static void main(final String[] args) throws Exception { > Path p = new Path(args[0]); > int bufLen = Integer.parseInt(args[1]); > int sleepDuration = Integer.parseInt(args[2]); > int countBeforeSleep = Integer.parseInt(args[3]); > int countAfterSleep = Integer.parseInt(args[4]); > Configuration conf = new Configuration(); > FSDataInputStream istream = FileSystem.get(conf).open(p); > byte[] buf = new byte[bufLen]; > int readTotal = 0; > int count = 0; > try { > while (true) { > istream.seek(readTotal); > int bytesRemaining = bufLen; > int bufOffset = 0; > while (bytesRemaining > 0) { > int nread = istream.read(buf, 0, bufLen); > if (nread < 0) { > throw new Exception("nread is less than zero"); > } > readTotal += nread; > bufOffset += nread; > bytesRemaining -= nread; > } > count++; > if (count == countBeforeSleep) { > System.out.println("sleeping for " + sleepDuration + " > milliseconds"); > Thread.sleep(sleepDuration); > System.out.println("resuming"); > } > if (count == countBeforeSleep + countAfterSleep) { > System.out.println("done"); > break; > } > } > } catch (Exception e) { > System.out.println("exception on read " + count + " read total " > + readTotal); > throw e; > } > } > } > {code} > The issue appears to be due to the fact that datanodes close the connection > of EC client if it doesn't fetch next packet for longer than > dfs.client.socket-timeout. The EC client doesn't retry and instead assumes > that those datanodes went away resulting in "missing blocks" exception. > I was able to consistently reproduce with the following arguments: > {noformat} > bufLen = 1000000 (just below 1MB which is the size of the stripe) > sleepDuration = (dfs.client.socket-timeout + 1) * 1000 (in our case 11000) > countBeforeSleep = 1 > countAfterSleep = 7 > {noformat} > I've attached the entire log output of running the snippet above against > erasure coded file with RS-3-2-1024k policy. And here are the logs from > datanodes of disconnecting the client: > datanode 1: > {noformat} > 2020-06-15 19:06:20,697 INFO datanode.DataNode: Likely the client has stopped > reading, disconnecting it (datanode-v11-0-hadoop.hadoop:9866:DataXceiver > error processing READ_BLOCK operation src: /10.128.23.40:53748 dst: > /10.128.14.46:9866); java.net.SocketTimeoutException: 10000 millis timeout > while waiting for channel to be ready for write. ch : > java.nio.channels.SocketChannel[connected local=/10.128.14.46:9866 > remote=/10.128.23.40:53748] > {noformat} > datanode 2: > {noformat} > 2020-06-15 19:06:20,341 INFO datanode.DataNode: Likely the client has stopped > reading, disconnecting it (datanode-v11-1-hadoop.hadoop:9866:DataXceiver > error processing READ_BLOCK operation src: /10.128.23.40:48772 dst: > /10.128.9.42:9866); java.net.SocketTimeoutException: 10000 millis timeout > while waiting for channel to be ready for write. ch : > java.nio.channels.SocketChannel[connected local=/10.128.9.42:9866 > remote=/10.128.23.40:48772] > {noformat} > datanode 3: > {noformat} > 2020-06-15 19:06:20,467 INFO datanode.DataNode: Likely the client has stopped > reading, disconnecting it (datanode-v11-3-hadoop.hadoop:9866:DataXceiver > error processing READ_BLOCK operation src: /10.128.23.40:57184 dst: > /10.128.16.13:9866); java.net.SocketTimeoutException: 10000 millis timeout > while waiting for channel to be ready for write. ch : > java.nio.channels.SocketChannel[connected local=/10.128.16.13:9866 > remote=/10.128.23.40:57184] > {noformat} > I've tried running the same code again non-ec files with replication of 3 and > was not able to reproduce the issue with any parameters. Looking through the > code, it's pretty clear that non-ec DFSInputStream retries reads after > exception: > https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java#L844 > Let me know if you need any more information that can help you out with > addressing this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org