This is an automated email from the ASF dual-hosted git repository.

surendralilhore pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new aa1c795  HDFS-14384. When lastLocatedBlock token expire, it will take 
1~3s second to refetch it. Contributed by Surendra Singh Lilhore.
aa1c795 is described below

commit aa1c795dc94b9d935a2e7e83445570729d4a0d4c
Author: Surendra Singh Lilhore <surendralilh...@apache.org>
AuthorDate: Wed Nov 6 19:28:55 2019 +0530

    HDFS-14384. When lastLocatedBlock token expire, it will take 1~3s second to 
refetch it. Contributed by Surendra Singh Lilhore.
    
    (cherry picked from commit c36014165c212b26d75268ee3659aa2cadcff349)
---
 .../org/apache/hadoop/hdfs/DFSInputStream.java     | 18 ++++++--
 .../hdfs/security/token/block/TestBlockToken.java  | 49 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 3 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 52ed1d4..c7a0253 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -255,6 +255,13 @@ public class DFSInputStream extends FSInputStream
       }
     }
     locatedBlocks = newInfo;
+    long lastBlkBeingWrittenLength = getLastBlockLength();
+    fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
+
+    return lastBlkBeingWrittenLength;
+  }
+
+  private long getLastBlockLength() throws IOException{
     long lastBlockBeingWrittenLength = 0;
     if (!locatedBlocks.isLastBlockComplete()) {
       final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
@@ -273,8 +280,6 @@ public class DFSInputStream extends FSInputStream
       }
     }
 
-    fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
-
     return lastBlockBeingWrittenLength;
   }
 
@@ -457,7 +462,14 @@ public class DFSInputStream extends FSInputStream
         if (newBlocks == null || newBlocks.locatedBlockCount() == 0) {
           throw new EOFException("Could not find target position " + offset);
         }
-        locatedBlocks.insertRange(targetBlockIdx, 
newBlocks.getLocatedBlocks());
+        // Update the LastLocatedBlock, if offset is for last block.
+        if (offset >= locatedBlocks.getFileLength()) {
+          locatedBlocks = newBlocks;
+          lastBlockBeingWrittenLength = getLastBlockLength();
+        } else {
+          locatedBlocks.insertRange(targetBlockIdx,
+              newBlocks.getLocatedBlocks());
+        }
       }
       return locatedBlocks.get(targetBlockIdx);
     }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
index 1473fa6..fe5ee65 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
@@ -34,6 +34,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.DataOutput;
 import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.EnumSet;
@@ -44,12 +45,14 @@ import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -64,6 +67,8 @@ import 
org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetRep
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
 import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.TestWritable;
@@ -92,6 +97,7 @@ import org.mockito.stubbing.Answer;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
+
 import org.apache.hadoop.fs.StorageType;
 
 /** Unit tests for block tokens */
@@ -887,4 +893,47 @@ public class TestBlockToken {
         new DataInputStream(new ByteArrayInputStream(masterId.getBytes())));
     assertArrayEquals(password, sm.retrievePassword(slaveId));
   }
+
+  /** Test for last in-progress block token expiry.
+   * 1. Write file with one block which is in-progress.
+   * 2. Open input stream and close the output stream.
+   * 3. Wait for block token expiration and read the data.
+   * 4. Read should be success.
+   */
+  @Test
+  public void testLastLocatedBlockTokenExpiry()
+      throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build()) {
+      cluster.waitClusterUp();
+      final NameNode nn = cluster.getNameNode();
+      final BlockManager bm = nn.getNamesystem().getBlockManager();
+      final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
+
+      // set a short token lifetime (1 second)
+      SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
+
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path p = new Path("/tmp/abc.log");
+      FSDataOutputStream out = fs.create(p);
+      byte[] data = "hello\n".getBytes(StandardCharsets.UTF_8);
+      out.write(data);
+      out.hflush();
+      FSDataInputStream in = fs.open(p);
+      out.close();
+
+      // wait for last block token to expire
+      Thread.sleep(2000L);
+
+      byte[] readData = new byte[data.length];
+      long startTime = System.currentTimeMillis();
+      in.read(readData);
+      // DFSInputStream#refetchLocations() minimum wait for 1sec to refetch
+      // complete located blocks.
+      assertTrue("Should not wait for refetch complete located blocks",
+          1000L > (System.currentTimeMillis() - startTime));
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to