[ 
https://issues.apache.org/jira/browse/HDFS-15413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793170#comment-17793170
 ] 

ASF GitHub Bot commented on HDFS-15413:
---------------------------------------

Neilxzn commented on code in PR #5829:
URL: https://github.com/apache/hadoop/pull/5829#discussion_r1415029913


##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java:
##########
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
+import 
org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class TestDFSStripedInputStreamWithTimeout {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestDFSStripedInputStreamWithTimeout.class);
+
+  private MiniDFSCluster cluster;
+  private Configuration conf = new Configuration();
+  private DistributedFileSystem fs;
+  private final Path dirPath = new Path("/striped");
+  private Path filePath = new Path(dirPath, "file");
+  private ErasureCodingPolicy ecPolicy;
+  private short dataBlocks;
+  private short parityBlocks;
+  private int cellSize;
+  private final int stripesPerBlock = 2;
+  private int blockSize;
+  private int blockGroupSize;
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(300000);
+
+  public ErasureCodingPolicy getEcPolicy() {
+    return StripedFileTestUtil.getDefaultECPolicy();
+  }
+
+  @Before
+  public void setup() throws IOException {
+    /*
+     * Initialize erasure coding policy.
+     */
+    ecPolicy = getEcPolicy();
+    dataBlocks = (short) ecPolicy.getNumDataUnits();
+    parityBlocks = (short) ecPolicy.getNumParityUnits();
+    cellSize = ecPolicy.getCellSize();
+    blockSize = stripesPerBlock * cellSize;
+    blockGroupSize = dataBlocks * blockSize;
+    System.out.println("EC policy = " + ecPolicy);
+
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 1000);
+    // SET CONFIG FOR HDFS CLIENT
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000);
+    conf.setInt(HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS, 3);
+
+    if (ErasureCodeNative.isNativeCodeLoaded()) {
+      conf.set(
+          CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
+          NativeRSRawErasureCoderFactory.CODER_NAME);
+    }
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
+        GenericTestUtils.getRandomizedTempPath());
+    SimulatedFSDataset.setFactory(conf);
+    startUp();
+  }
+
+  private void startUp() throws IOException {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        dataBlocks + parityBlocks).build();
+    cluster.waitActive();
+    for (DataNode dn : cluster.getDataNodes()) {
+      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+    }
+    fs = cluster.getFileSystem();
+    fs.enableErasureCodingPolicy(getEcPolicy().getName());
+    fs.mkdirs(dirPath);
+    fs.getClient()
+        .setErasureCodingPolicy(dirPath.toString(), ecPolicy.getName());
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test
+  public void testPreadTimeout() throws Exception {
+    final int numBlocks = 2;
+    DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+        stripesPerBlock, false, ecPolicy);
+    final int fileSize = numBlocks * blockGroupSize;
+
+    LocatedBlocks lbs = fs.getClient().namenode.
+        getBlockLocations(filePath.toString(), 0, fileSize);
+
+    for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+      assert lb instanceof LocatedStripedBlock;
+      LocatedStripedBlock bg = (LocatedStripedBlock) (lb);
+      for (int i = 0; i < dataBlocks; i++) {
+        Block blk = new Block(bg.getBlock().getBlockId() + i,
+            stripesPerBlock * cellSize,
+            bg.getBlock().getGenerationStamp());
+        blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+        cluster.injectBlocks(i, Arrays.asList(blk),
+            bg.getBlock().getBlockPoolId());
+      }
+    }
+
+    DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
+        filePath.toString(), false, ecPolicy, null);
+    int bufLen = 1024 * 100;
+    byte[] buf = new byte[bufLen];
+    int readTotal = 0;
+    try {
+      while (readTotal < fileSize) {
+        in.seek(readTotal);
+        int nread = in.read(buf, 0, bufLen);
+        readTotal += nread;
+        // Simulated time-consuming processing operations, such as UDF.
+        Thread.sleep(10);
+      }
+      Assert.assertEquals("Success to read striped file.", fileSize, 
readTotal);
+    } catch (Exception e) {
+      Assert.fail("Fail to read striped time out. ");
+    }
+    in.close();

Review Comment:
   fix it and add testReadFileWithAttempt for unit test. Please review it 
again. Thank you!





> DFSStripedInputStream throws exception when datanodes close idle connections
> ----------------------------------------------------------------------------
>
>                 Key: HDFS-15413
>                 URL: https://issues.apache.org/jira/browse/HDFS-15413
>             Project: Hadoop HDFS
>          Issue Type: Bug
>          Components: ec, erasure-coding, hdfs-client
>    Affects Versions: 3.1.3
>         Environment: - Hadoop 3.1.3
> - erasure coding with ISA-L and RS-3-2-1024k scheme
> - running in kubernetes
> - dfs.client.socket-timeout = 10000
> - dfs.datanode.socket.write.timeout = 10000
>            Reporter: Andrey Elenskiy
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: out.log
>
>
> We've run into an issue with compactions failing in HBase when erasure coding 
> is enabled on a table directory. After digging further I was able to narrow 
> it down to a seek + read logic and able to reproduce the issue with hdfs 
> client only:
> {code:java}
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.FSDataInputStream;
> public class ReaderRaw {
>     public static void main(final String[] args) throws Exception {
>         Path p = new Path(args[0]);
>         int bufLen = Integer.parseInt(args[1]);
>         int sleepDuration = Integer.parseInt(args[2]);
>         int countBeforeSleep = Integer.parseInt(args[3]);
>         int countAfterSleep = Integer.parseInt(args[4]);
>         Configuration conf = new Configuration();
>         FSDataInputStream istream = FileSystem.get(conf).open(p);
>         byte[] buf = new byte[bufLen];
>         int readTotal = 0;
>         int count = 0;
>         try {
>           while (true) {
>             istream.seek(readTotal);
>             int bytesRemaining = bufLen;
>             int bufOffset = 0;
>             while (bytesRemaining > 0) {
>               int nread = istream.read(buf, 0, bufLen);
>               if (nread < 0) {
>                   throw new Exception("nread is less than zero");
>               }
>               readTotal += nread;
>               bufOffset += nread;
>               bytesRemaining -= nread;
>             }
>             count++;
>             if (count == countBeforeSleep) {
>                 System.out.println("sleeping for " + sleepDuration + " 
> milliseconds");
>                 Thread.sleep(sleepDuration);
>                 System.out.println("resuming");
>             }
>             if (count == countBeforeSleep + countAfterSleep) {
>                 System.out.println("done");
>                 break;
>             }
>           }
>         } catch (Exception e) {
>             System.out.println("exception on read " + count + " read total " 
> + readTotal);
>             throw e;
>         }
>     }
> }
> {code}
> The issue appears to be due to the fact that datanodes close the connection 
> of EC client if it doesn't fetch next packet for longer than 
> dfs.client.socket-timeout. The EC client doesn't retry and instead assumes 
> that those datanodes went away resulting in "missing blocks" exception.
> I was able to consistently reproduce with the following arguments:
> {noformat}
> bufLen = 1000000 (just below 1MB which is the size of the stripe) 
> sleepDuration = (dfs.client.socket-timeout + 1) * 1000 (in our case 11000)
> countBeforeSleep = 1
> countAfterSleep = 7
> {noformat}
> I've attached the entire log output of running the snippet above against 
> erasure coded file with RS-3-2-1024k policy. And here are the logs from 
> datanodes of disconnecting the client:
> datanode 1:
> {noformat}
> 2020-06-15 19:06:20,697 INFO datanode.DataNode: Likely the client has stopped 
> reading, disconnecting it (datanode-v11-0-hadoop.hadoop:9866:DataXceiver 
> error processing READ_BLOCK operation  src: /10.128.23.40:53748 dst: 
> /10.128.14.46:9866); java.net.SocketTimeoutException: 10000 millis timeout 
> while waiting for channel to be ready for write. ch : 
> java.nio.channels.SocketChannel[connected local=/10.128.14.46:9866 
> remote=/10.128.23.40:53748]
> {noformat}
> datanode 2:
> {noformat}
> 2020-06-15 19:06:20,341 INFO datanode.DataNode: Likely the client has stopped 
> reading, disconnecting it (datanode-v11-1-hadoop.hadoop:9866:DataXceiver 
> error processing READ_BLOCK operation  src: /10.128.23.40:48772 dst: 
> /10.128.9.42:9866); java.net.SocketTimeoutException: 10000 millis timeout 
> while waiting for channel to be ready for write. ch : 
> java.nio.channels.SocketChannel[connected local=/10.128.9.42:9866 
> remote=/10.128.23.40:48772]
> {noformat}
> datanode 3:
> {noformat}
> 2020-06-15 19:06:20,467 INFO datanode.DataNode: Likely the client has stopped 
> reading, disconnecting it (datanode-v11-3-hadoop.hadoop:9866:DataXceiver 
> error processing READ_BLOCK operation  src: /10.128.23.40:57184 dst: 
> /10.128.16.13:9866); java.net.SocketTimeoutException: 10000 millis timeout 
> while waiting for channel to be ready for write. ch : 
> java.nio.channels.SocketChannel[connected local=/10.128.16.13:9866 
> remote=/10.128.23.40:57184]
> {noformat}
> I've tried running the same code again non-ec files with replication of 3 and 
> was not able to reproduce the issue with any parameters. Looking through the 
> code, it's pretty clear that non-ec DFSInputStream retries reads after 
> exception: 
> https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java#L844
> Let me know if you need any more information that can help you out with 
> addressing this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to