Neilxzn commented on code in PR #5829:
URL: https://github.com/apache/hadoop/pull/5829#discussion_r1415029913


##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java:
##########
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
+import 
org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class TestDFSStripedInputStreamWithTimeout {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestDFSStripedInputStreamWithTimeout.class);
+
+  private MiniDFSCluster cluster;
+  private Configuration conf = new Configuration();
+  private DistributedFileSystem fs;
+  private final Path dirPath = new Path("/striped");
+  private Path filePath = new Path(dirPath, "file");
+  private ErasureCodingPolicy ecPolicy;
+  private short dataBlocks;
+  private short parityBlocks;
+  private int cellSize;
+  private final int stripesPerBlock = 2;
+  private int blockSize;
+  private int blockGroupSize;
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(300000);
+
+  public ErasureCodingPolicy getEcPolicy() {
+    return StripedFileTestUtil.getDefaultECPolicy();
+  }
+
+  @Before
+  public void setup() throws IOException {
+    /*
+     * Initialize erasure coding policy.
+     */
+    ecPolicy = getEcPolicy();
+    dataBlocks = (short) ecPolicy.getNumDataUnits();
+    parityBlocks = (short) ecPolicy.getNumParityUnits();
+    cellSize = ecPolicy.getCellSize();
+    blockSize = stripesPerBlock * cellSize;
+    blockGroupSize = dataBlocks * blockSize;
+    System.out.println("EC policy = " + ecPolicy);
+
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 1000);
+    // SET CONFIG FOR HDFS CLIENT
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000);
+    conf.setInt(HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS, 3);
+
+    if (ErasureCodeNative.isNativeCodeLoaded()) {
+      conf.set(
+          CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
+          NativeRSRawErasureCoderFactory.CODER_NAME);
+    }
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
+        GenericTestUtils.getRandomizedTempPath());
+    SimulatedFSDataset.setFactory(conf);
+    startUp();
+  }
+
+  private void startUp() throws IOException {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        dataBlocks + parityBlocks).build();
+    cluster.waitActive();
+    for (DataNode dn : cluster.getDataNodes()) {
+      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+    }
+    fs = cluster.getFileSystem();
+    fs.enableErasureCodingPolicy(getEcPolicy().getName());
+    fs.mkdirs(dirPath);
+    fs.getClient()
+        .setErasureCodingPolicy(dirPath.toString(), ecPolicy.getName());
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test
+  public void testPreadTimeout() throws Exception {
+    final int numBlocks = 2;
+    DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+        stripesPerBlock, false, ecPolicy);
+    final int fileSize = numBlocks * blockGroupSize;
+
+    LocatedBlocks lbs = fs.getClient().namenode.
+        getBlockLocations(filePath.toString(), 0, fileSize);
+
+    for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+      assert lb instanceof LocatedStripedBlock;
+      LocatedStripedBlock bg = (LocatedStripedBlock) (lb);
+      for (int i = 0; i < dataBlocks; i++) {
+        Block blk = new Block(bg.getBlock().getBlockId() + i,
+            stripesPerBlock * cellSize,
+            bg.getBlock().getGenerationStamp());
+        blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+        cluster.injectBlocks(i, Arrays.asList(blk),
+            bg.getBlock().getBlockPoolId());
+      }
+    }
+
+    DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
+        filePath.toString(), false, ecPolicy, null);
+    int bufLen = 1024 * 100;
+    byte[] buf = new byte[bufLen];
+    int readTotal = 0;
+    try {
+      while (readTotal < fileSize) {
+        in.seek(readTotal);
+        int nread = in.read(buf, 0, bufLen);
+        readTotal += nread;
+        // Simulated time-consuming processing operations, such as UDF.
+        Thread.sleep(10);
+      }
+      Assert.assertEquals("Success to read striped file.", fileSize, 
readTotal);
+    } catch (Exception e) {
+      Assert.fail("Fail to read striped time out. ");
+    }
+    in.close();

Review Comment:
   fix it and add testReadFileWithAttempt for unit test. Please review it 
again. Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to