[ 
https://issues.apache.org/jira/browse/HDFS-17455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835114#comment-17835114
 ] 

ASF GitHub Bot commented on HDFS-17455:
---------------------------------------

ZanderXu commented on code in PR #6710:
URL: https://github.com/apache/hadoop/pull/6710#discussion_r1556772812


##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java:
##########
@@ -287,4 +300,69 @@ public void testReadWithoutPreferredCachingReplica() 
throws IOException {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testCreateBlockReaderWhenInvalidBlockTokenException() throws
+      IOException, InterruptedException, TimeoutException {
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
+    Configuration conf = new Configuration();
+    DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get();
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      String file = "/testfile";
+      Path path = new Path(file);
+      long fileLen = 1024 * 64;
+      EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+      FSDataOutputStream out =  fs.create(path, FsPermission.getFileDefault(), 
createFlags,

Review Comment:
   this out should be closed in the `finally` logic, right?
   



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java:
##########
@@ -287,4 +300,69 @@ public void testReadWithoutPreferredCachingReplica() 
throws IOException {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testCreateBlockReaderWhenInvalidBlockTokenException() throws
+      IOException, InterruptedException, TimeoutException {
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
+    Configuration conf = new Configuration();
+    DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get();
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      String file = "/testfile";
+      Path path = new Path(file);
+      long fileLen = 1024 * 64;
+      EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+      FSDataOutputStream out =  fs.create(path, FsPermission.getFileDefault(), 
createFlags,
+          fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), (short) 3,
+          fs.getDefaultBlockSize(path), null);
+      int bufferLen = 1024;
+      byte[] toWrite = new byte[bufferLen];
+      Random rb = new Random(0);
+      long bytesToWrite = fileLen;
+      while (bytesToWrite > 0) {
+        rb.nextBytes(toWrite);
+        int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen : (int) 
bytesToWrite;
+        out.write(toWrite, 0, bytesToWriteNext);

Review Comment:
   Please add some comments to show that you just want to create a file which 
only contains one UC block.



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java:
##########
@@ -287,4 +300,69 @@ public void testReadWithoutPreferredCachingReplica() 
throws IOException {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testCreateBlockReaderWhenInvalidBlockTokenException() throws
+      IOException, InterruptedException, TimeoutException {
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
+    Configuration conf = new Configuration();
+    DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get();
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      String file = "/testfile";
+      Path path = new Path(file);
+      long fileLen = 1024 * 64;
+      EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+      FSDataOutputStream out =  fs.create(path, FsPermission.getFileDefault(), 
createFlags,
+          fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), (short) 3,
+          fs.getDefaultBlockSize(path), null);

Review Comment:
   What's the default block size? 256MB? If so, please hardcode it.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java:
##########
@@ -520,6 +520,15 @@ private LocatedBlock fetchBlockAt(long offset, long 
length, boolean useCache)
         // Update the LastLocatedBlock, if offset is for last block.
         if (offset >= locatedBlocks.getFileLength()) {
           setLocatedBlocksFields(newBlocks, getLastBlockLength(newBlocks));
+          // Here locatedBlocks has been updated, need to check offset again.
+          // If offset to the portion of the last block, will return the last 
block,
+          // otherwise the block containing the specified offset needs to be 
searched again.
+          if (offset >= locatedBlocks.getFileLength()) {

Review Comment:
   Make sense. Please make the comments clearer.
   ```
             /**
              * After updating the locatedBlock, the block to which the offset 
belongs
              * should be researched like {@link 
DFSInputStream#getBlockAt(long)}.
              */
   ```



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java:
##########
@@ -287,4 +300,69 @@ public void testReadWithoutPreferredCachingReplica() 
throws IOException {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testCreateBlockReaderWhenInvalidBlockTokenException() throws
+      IOException, InterruptedException, TimeoutException {
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
+    Configuration conf = new Configuration();
+    DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get();
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      String file = "/testfile";
+      Path path = new Path(file);
+      long fileLen = 1024 * 64;
+      EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+      FSDataOutputStream out =  fs.create(path, FsPermission.getFileDefault(), 
createFlags,
+          fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), (short) 3,
+          fs.getDefaultBlockSize(path), null);
+      int bufferLen = 1024;
+      byte[] toWrite = new byte[bufferLen];
+      Random rb = new Random(0);
+      long bytesToWrite = fileLen;
+      while (bytesToWrite > 0) {
+        rb.nextBytes(toWrite);
+        int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen : (int) 
bytesToWrite;
+        out.write(toWrite, 0, bytesToWriteNext);
+        bytesToWrite -= bytesToWriteNext;
+      }
+
+      GenericTestUtils.waitFor(() -> {
+        try {
+          return fs.getFileBlockLocations(path, 0, fileLen).length == 1;
+        } catch (IOException e) {
+          return false;
+        }
+      }, 100, 10000);
+
+      // Set up the InjectionHandler.
+      DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class));
+      DFSClientFaultInjector injector = DFSClientFaultInjector.get();
+      final AtomicInteger count = new AtomicInteger(0);
+      Mockito.doAnswer(new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          // Mock access token was invalid when connecting to first datanode
+          // throw InvalidBlockTokenException.
+          if (count.getAndIncrement() == 0) {
+            throw new InvalidBlockTokenException("Mock 
InvalidBlockTokenException");
+          }
+          return null;
+        }
+      }).when(injector).failCreateBlockReader();
+
+      try (DFSInputStream in = new DFSInputStream(fs.getClient(), file,
+          false, null)) {
+        int bufLen = 1024;
+        byte[] buf = new byte[bufLen];
+        //Seek the offset to 1024.
+        in.seek(1024);

Review Comment:
   add some comments to show that the offset should be in (0, fileSize).



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java:
##########
@@ -287,4 +300,69 @@ public void testReadWithoutPreferredCachingReplica() 
throws IOException {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testCreateBlockReaderWhenInvalidBlockTokenException() throws
+      IOException, InterruptedException, TimeoutException {
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
+    Configuration conf = new Configuration();
+    DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get();
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      String file = "/testfile";
+      Path path = new Path(file);
+      long fileLen = 1024 * 64;
+      EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+      FSDataOutputStream out =  fs.create(path, FsPermission.getFileDefault(), 
createFlags,
+          fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), (short) 3,
+          fs.getDefaultBlockSize(path), null);
+      int bufferLen = 1024;
+      byte[] toWrite = new byte[bufferLen];
+      Random rb = new Random(0);
+      long bytesToWrite = fileLen;
+      while (bytesToWrite > 0) {

Review Comment:
   These logic is unnecessary if you just want to write 64KB data. 2KB is 
enough, right? I see you just seek to 1024.





> Fix Client throw IndexOutOfBoundsException in DFSInputStream#fetchBlockAt
> -------------------------------------------------------------------------
>
>                 Key: HDFS-17455
>                 URL: https://issues.apache.org/jira/browse/HDFS-17455
>             Project: Hadoop HDFS
>          Issue Type: Bug
>            Reporter: Haiyang Hu
>            Assignee: Haiyang Hu
>            Priority: Major
>              Labels: pull-request-available
>
> When the client read data, connect to the datanode, because at this time the 
> datanode access token is invalid will throw InvalidBlockTokenException. At 
> this time, when call fetchBlockAt method will  throw 
> java.lang.IndexOutOfBoundsException causing  read data failed.
> *Root case:*
> * The HDFS file contains only one RBW block, with a block data size of 2048KB.
> * The client open this file and seeks to the offset of 1024KB to read data.
> * Call DFSInputStream#getBlockReader method connect to the datanode,  because 
> at this time the datanode access token is invalid will throw 
> InvalidBlockTokenException., and call DFSInputStream#fetchBlockAt will throw 
> java.lang.IndexOutOfBoundsException.
> {code:java}
> private synchronized DatanodeInfo blockSeekTo(long target)
>      throws IOException {
>    if (target >= getFileLength()) {
>    // the target size is smaller than fileLength (completeBlockSize + 
> lastBlockBeingWrittenLength),
>    // here at this time target is 1024 and getFileLength is 2048
>      throw new IOException("Attempted to read past end of file");
>    }
>    ...
>    while (true) {
>      ...
>      try {
>        blockReader = getBlockReader(targetBlock, offsetIntoBlock,
>            targetBlock.getBlockSize() - offsetIntoBlock, targetAddr,
>            storageType, chosenNode);
>        if(connectFailedOnce) {
>          DFSClient.LOG.info("Successfully connected to " + targetAddr +
>                             " for " + targetBlock.getBlock());
>        }
>        return chosenNode;
>      } catch (IOException ex) {
>        ...
>        } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
>          refetchToken--;
>          // Here will catch InvalidBlockTokenException.
>          fetchBlockAt(target);
>        } else {
>          ...
>        }
>      }
>    }
>  }
> private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache)
>       throws IOException {
>     maybeRegisterBlockRefresh();
>     synchronized(infoLock) {
>       // Here the locatedBlocks only contains one locatedBlock, at this time 
> the offset is 1024 and fileLength is 0,
>       // so the targetBlockIdx is -2
>       int targetBlockIdx = locatedBlocks.findBlock(offset);
>       if (targetBlockIdx < 0) { // block is not cached
>         targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
>         // Here the targetBlockIdx is 1;
>         useCache = false;
>       }
>       if (!useCache) { // fetch blocks
>         final LocatedBlocks newBlocks = (length == 0)
>             ? dfsClient.getLocatedBlocks(src, offset)
>             : dfsClient.getLocatedBlocks(src, offset, length);
>         if (newBlocks == null || newBlocks.locatedBlockCount() == 0) {
>           throw new EOFException("Could not find target position " + offset);
>         }
>         // Update the LastLocatedBlock, if offset is for last block.
>         if (offset >= locatedBlocks.getFileLength()) {
>           setLocatedBlocksFields(newBlocks, getLastBlockLength(newBlocks));
>         } else {
>           locatedBlocks.insertRange(targetBlockIdx,
>               newBlocks.getLocatedBlocks());
>         }
>       }
>       // Here the locatedBlocks only contains one locatedBlock, so will throw 
> java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
>       return locatedBlocks.get(targetBlockIdx);
>     }
>   }
> {code}
> The client exception:
> {code:java}
> java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
>         at 
> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>         at 
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>         at 
> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:266)
>         at java.base/java.util.Objects.checkIndex(Objects.java:359)
>         at java.base/java.util.ArrayList.get(ArrayList.java:427)
>         at 
> org.apache.hadoop.hdfs.protocol.LocatedBlocks.get(LocatedBlocks.java:87)
>         at 
> org.apache.hadoop.hdfs.DFSInputStream.fetchBlockAt(DFSInputStream.java:569)
>         at 
> org.apache.hadoop.hdfs.DFSInputStream.fetchBlockAt(DFSInputStream.java:540)
>         at 
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:704)
>         at 
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:884)
>         at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:957)
>         at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:804)
> {code}
> The datanode exception:
> {code:java}
> 2024-03-27 15:56:35,477 WARN  datanode.DataNode 
> (DataXceiver.java:checkAccess(1487)) [DataXceiver for client 
> DFSClient_NONMAPREDUCE_475786505_1 at /xxx [Sending block 
> BP-xxx:blk_1138933918_65194340]] - Block token verification failed: 
> op=READ_BLOCK, remoteAddress=/XXX, message=Can't re-compute password for 
> block_token_identifier (expiryDate=1711562193469, keyId=1775816931, 
> userId=test, blockPoolId=BP-xxx-xxx-xxx, blockId=1138933918, access 
> modes=[READ], storageTypes= [SSD, SSD, SSD], storageIds= [DS-xxx1, 
> DS-xxx2,DS-xxx3]), since the required block key (keyID=1775816931) doesn't 
> exist
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to