[ https://issues.apache.org/jira/browse/HDFS-17455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835114#comment-17835114 ]
ASF GitHub Bot commented on HDFS-17455: --------------------------------------- ZanderXu commented on code in PR #6710: URL: https://github.com/apache/hadoop/pull/6710#discussion_r1556772812 ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java: ########## @@ -287,4 +300,69 @@ public void testReadWithoutPreferredCachingReplica() throws IOException { cluster.shutdown(); } } + + @Test + public void testCreateBlockReaderWhenInvalidBlockTokenException() throws + IOException, InterruptedException, TimeoutException { + GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG); + Configuration conf = new Configuration(); + DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + String file = "/testfile"; + Path path = new Path(file); + long fileLen = 1024 * 64; + EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE); + FSDataOutputStream out = fs.create(path, FsPermission.getFileDefault(), createFlags, Review Comment: this out should be closed in the `finally` logic, right? ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java: ########## @@ -287,4 +300,69 @@ public void testReadWithoutPreferredCachingReplica() throws IOException { cluster.shutdown(); } } + + @Test + public void testCreateBlockReaderWhenInvalidBlockTokenException() throws + IOException, InterruptedException, TimeoutException { + GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG); + Configuration conf = new Configuration(); + DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + String file = "/testfile"; + Path path = new Path(file); + long fileLen = 1024 * 64; + EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE); + FSDataOutputStream out = fs.create(path, FsPermission.getFileDefault(), createFlags, + fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), (short) 3, + fs.getDefaultBlockSize(path), null); + int bufferLen = 1024; + byte[] toWrite = new byte[bufferLen]; + Random rb = new Random(0); + long bytesToWrite = fileLen; + while (bytesToWrite > 0) { + rb.nextBytes(toWrite); + int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen : (int) bytesToWrite; + out.write(toWrite, 0, bytesToWriteNext); Review Comment: Please add some comments to show that you just want to create a file which only contains one UC block. ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java: ########## @@ -287,4 +300,69 @@ public void testReadWithoutPreferredCachingReplica() throws IOException { cluster.shutdown(); } } + + @Test + public void testCreateBlockReaderWhenInvalidBlockTokenException() throws + IOException, InterruptedException, TimeoutException { + GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG); + Configuration conf = new Configuration(); + DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + String file = "/testfile"; + Path path = new Path(file); + long fileLen = 1024 * 64; + EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE); + FSDataOutputStream out = fs.create(path, FsPermission.getFileDefault(), createFlags, + fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), (short) 3, + fs.getDefaultBlockSize(path), null); Review Comment: What's the default block size? 256MB? If so, please hardcode it. ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java: ########## @@ -520,6 +520,15 @@ private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache) // Update the LastLocatedBlock, if offset is for last block. if (offset >= locatedBlocks.getFileLength()) { setLocatedBlocksFields(newBlocks, getLastBlockLength(newBlocks)); + // Here locatedBlocks has been updated, need to check offset again. + // If offset to the portion of the last block, will return the last block, + // otherwise the block containing the specified offset needs to be searched again. + if (offset >= locatedBlocks.getFileLength()) { Review Comment: Make sense. Please make the comments clearer. ``` /** * After updating the locatedBlock, the block to which the offset belongs * should be researched like {@link DFSInputStream#getBlockAt(long)}. */ ``` ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java: ########## @@ -287,4 +300,69 @@ public void testReadWithoutPreferredCachingReplica() throws IOException { cluster.shutdown(); } } + + @Test + public void testCreateBlockReaderWhenInvalidBlockTokenException() throws + IOException, InterruptedException, TimeoutException { + GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG); + Configuration conf = new Configuration(); + DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + String file = "/testfile"; + Path path = new Path(file); + long fileLen = 1024 * 64; + EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE); + FSDataOutputStream out = fs.create(path, FsPermission.getFileDefault(), createFlags, + fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), (short) 3, + fs.getDefaultBlockSize(path), null); + int bufferLen = 1024; + byte[] toWrite = new byte[bufferLen]; + Random rb = new Random(0); + long bytesToWrite = fileLen; + while (bytesToWrite > 0) { + rb.nextBytes(toWrite); + int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen : (int) bytesToWrite; + out.write(toWrite, 0, bytesToWriteNext); + bytesToWrite -= bytesToWriteNext; + } + + GenericTestUtils.waitFor(() -> { + try { + return fs.getFileBlockLocations(path, 0, fileLen).length == 1; + } catch (IOException e) { + return false; + } + }, 100, 10000); + + // Set up the InjectionHandler. + DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class)); + DFSClientFaultInjector injector = DFSClientFaultInjector.get(); + final AtomicInteger count = new AtomicInteger(0); + Mockito.doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + // Mock access token was invalid when connecting to first datanode + // throw InvalidBlockTokenException. + if (count.getAndIncrement() == 0) { + throw new InvalidBlockTokenException("Mock InvalidBlockTokenException"); + } + return null; + } + }).when(injector).failCreateBlockReader(); + + try (DFSInputStream in = new DFSInputStream(fs.getClient(), file, + false, null)) { + int bufLen = 1024; + byte[] buf = new byte[bufLen]; + //Seek the offset to 1024. + in.seek(1024); Review Comment: add some comments to show that the offset should be in (0, fileSize). ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java: ########## @@ -287,4 +300,69 @@ public void testReadWithoutPreferredCachingReplica() throws IOException { cluster.shutdown(); } } + + @Test + public void testCreateBlockReaderWhenInvalidBlockTokenException() throws + IOException, InterruptedException, TimeoutException { + GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG); + Configuration conf = new Configuration(); + DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + String file = "/testfile"; + Path path = new Path(file); + long fileLen = 1024 * 64; + EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE); + FSDataOutputStream out = fs.create(path, FsPermission.getFileDefault(), createFlags, + fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), (short) 3, + fs.getDefaultBlockSize(path), null); + int bufferLen = 1024; + byte[] toWrite = new byte[bufferLen]; + Random rb = new Random(0); + long bytesToWrite = fileLen; + while (bytesToWrite > 0) { Review Comment: These logic is unnecessary if you just want to write 64KB data. 2KB is enough, right? I see you just seek to 1024. > Fix Client throw IndexOutOfBoundsException in DFSInputStream#fetchBlockAt > ------------------------------------------------------------------------- > > Key: HDFS-17455 > URL: https://issues.apache.org/jira/browse/HDFS-17455 > Project: Hadoop HDFS > Issue Type: Bug > Reporter: Haiyang Hu > Assignee: Haiyang Hu > Priority: Major > Labels: pull-request-available > > When the client read data, connect to the datanode, because at this time the > datanode access token is invalid will throw InvalidBlockTokenException. At > this time, when call fetchBlockAt method will throw > java.lang.IndexOutOfBoundsException causing read data failed. > *Root case:* > * The HDFS file contains only one RBW block, with a block data size of 2048KB. > * The client open this file and seeks to the offset of 1024KB to read data. > * Call DFSInputStream#getBlockReader method connect to the datanode, because > at this time the datanode access token is invalid will throw > InvalidBlockTokenException., and call DFSInputStream#fetchBlockAt will throw > java.lang.IndexOutOfBoundsException. > {code:java} > private synchronized DatanodeInfo blockSeekTo(long target) > throws IOException { > if (target >= getFileLength()) { > // the target size is smaller than fileLength (completeBlockSize + > lastBlockBeingWrittenLength), > // here at this time target is 1024 and getFileLength is 2048 > throw new IOException("Attempted to read past end of file"); > } > ... > while (true) { > ... > try { > blockReader = getBlockReader(targetBlock, offsetIntoBlock, > targetBlock.getBlockSize() - offsetIntoBlock, targetAddr, > storageType, chosenNode); > if(connectFailedOnce) { > DFSClient.LOG.info("Successfully connected to " + targetAddr + > " for " + targetBlock.getBlock()); > } > return chosenNode; > } catch (IOException ex) { > ... > } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) { > refetchToken--; > // Here will catch InvalidBlockTokenException. > fetchBlockAt(target); > } else { > ... > } > } > } > } > private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache) > throws IOException { > maybeRegisterBlockRefresh(); > synchronized(infoLock) { > // Here the locatedBlocks only contains one locatedBlock, at this time > the offset is 1024 and fileLength is 0, > // so the targetBlockIdx is -2 > int targetBlockIdx = locatedBlocks.findBlock(offset); > if (targetBlockIdx < 0) { // block is not cached > targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); > // Here the targetBlockIdx is 1; > useCache = false; > } > if (!useCache) { // fetch blocks > final LocatedBlocks newBlocks = (length == 0) > ? dfsClient.getLocatedBlocks(src, offset) > : dfsClient.getLocatedBlocks(src, offset, length); > if (newBlocks == null || newBlocks.locatedBlockCount() == 0) { > throw new EOFException("Could not find target position " + offset); > } > // Update the LastLocatedBlock, if offset is for last block. > if (offset >= locatedBlocks.getFileLength()) { > setLocatedBlocksFields(newBlocks, getLastBlockLength(newBlocks)); > } else { > locatedBlocks.insertRange(targetBlockIdx, > newBlocks.getLocatedBlocks()); > } > } > // Here the locatedBlocks only contains one locatedBlock, so will throw > java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1 > return locatedBlocks.get(targetBlockIdx); > } > } > {code} > The client exception: > {code:java} > java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1 > at > java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) > at > java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) > at > java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:266) > at java.base/java.util.Objects.checkIndex(Objects.java:359) > at java.base/java.util.ArrayList.get(ArrayList.java:427) > at > org.apache.hadoop.hdfs.protocol.LocatedBlocks.get(LocatedBlocks.java:87) > at > org.apache.hadoop.hdfs.DFSInputStream.fetchBlockAt(DFSInputStream.java:569) > at > org.apache.hadoop.hdfs.DFSInputStream.fetchBlockAt(DFSInputStream.java:540) > at > org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:704) > at > org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:884) > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:957) > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:804) > {code} > The datanode exception: > {code:java} > 2024-03-27 15:56:35,477 WARN datanode.DataNode > (DataXceiver.java:checkAccess(1487)) [DataXceiver for client > DFSClient_NONMAPREDUCE_475786505_1 at /xxx [Sending block > BP-xxx:blk_1138933918_65194340]] - Block token verification failed: > op=READ_BLOCK, remoteAddress=/XXX, message=Can't re-compute password for > block_token_identifier (expiryDate=1711562193469, keyId=1775816931, > userId=test, blockPoolId=BP-xxx-xxx-xxx, blockId=1138933918, access > modes=[READ], storageTypes= [SSD, SSD, SSD], storageIds= [DS-xxx1, > DS-xxx2,DS-xxx3]), since the required block key (keyID=1775816931) doesn't > exist > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org