ayushtkn commented on code in PR #6566: URL: https://github.com/apache/hadoop/pull/6566#discussion_r1509482598
########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java: ########## @@ -414,6 +414,10 @@ synchronized void markFirstNodeIfNotMarked() { } synchronized void adjustState4RestartingNode() { + if (restartingNodeIndex == -1) { + return; + } + Review Comment: Why is this needed? Below there is a logic, doesn't that take care of things? ``` if (!isRestartingNode()) { error = ErrorType.NONE; } badNodeIndex = -1; } ``` None of your tests fails without this for me ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java: ########## @@ -2259,4 +2282,4 @@ public String toString() { return extendedBlock == null ? "block==null" : "" + extendedBlock.getLocalBlock(); } -} +} Review Comment: nit unrelated change, pls avoid ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java: ########## @@ -2651,5 +2653,147 @@ public void testNameNodeCreateSnapshotTrashRootOnStartup() } } + @Test + public void testSingleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass( + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 2); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill one DN, so only 2 racks stays with active DN + cluster.stopDataNode(0); + // create a file with replication 3, for rack fault tolerant BPP, + // it should allocate nodes in all 3 racks. + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } + } + + @Test + public void testSingleRackFailureDuringPipelineSetupMinReplicationImpossible() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass( + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 3); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill one DN, so only 2 racks stays with active DN + cluster.stopDataNode(0); + boolean threw = false; + try { + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } catch (IOException e) { + // success + threw = true; + } + assertTrue("Failed to throw IOE when creating a file with less " + + "DNs than required for min replication", threw); Review Comment: Use ``LambdaTestsUtils.intercept`` ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java: ########## @@ -2651,5 +2653,147 @@ public void testNameNodeCreateSnapshotTrashRootOnStartup() } } + @Test + public void testSingleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass( + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 2); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill one DN, so only 2 racks stays with active DN + cluster.stopDataNode(0); + // create a file with replication 3, for rack fault tolerant BPP, + // it should allocate nodes in all 3 racks. + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } + } + + @Test + public void testSingleRackFailureDuringPipelineSetupMinReplicationImpossible() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass( + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 3); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill one DN, so only 2 racks stays with active DN + cluster.stopDataNode(0); + boolean threw = false; + try { + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } catch (IOException e) { + // success + threw = true; + } + assertTrue("Failed to throw IOE when creating a file with less " + + "DNs than required for min replication", threw); + } + } + + @Test + public void testMultipleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass( + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 1); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill 2 DN, so only 1 racks stays with active DN + cluster.stopDataNode(0); + cluster.stopDataNode(1); + // create a file with replication 3, for rack fault tolerant BPP, + // it should allocate nodes in all 3 racks. + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } + } + + @Test + public void testMultipleRackFailureDuringPipelineSetupMinReplicationImpossible() + throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 2); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill 2 DN, so only 1 rack stays with active DN + cluster.stopDataNode(0); + cluster.stopDataNode(1); + boolean threw = false; + try { + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } catch (IOException e) { + // success + threw = true; + } + assertTrue("Failed to throw IOE when creating a file with less " + + "DNs than required for min replication", threw); + } Review Comment: Use ``LambdaTestUtils.intercept`` ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java: ########## @@ -966,8 +970,8 @@ void waitForAckedSeqno(long seqno) throws IOException { long duration = Time.monotonicNowNanos() - begin; if (TimeUnit.NANOSECONDS.toMillis(duration) > writeTimeout) { LOG.error("No ack received, took {}ms (threshold={}ms). " - + "File being written: {}, block: {}, " - + "Write pipeline datanodes: {}.", + + "File being written: {}, block: {}, " Review Comment: nit unrelated change, pls avoid ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java: ########## @@ -1525,8 +1529,8 @@ private void addDatanode2ExistingPipeline() throws IOException { // MIN_REPLICATION is set to 0 or less than zero, an exception will be // thrown if a replacement could not be found. - if (dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && nodes.length - >= dfsClient.dtpReplaceDatanodeOnFailureReplication) { + if (dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && + nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication) { Review Comment: nit avoid ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java: ########## @@ -111,6 +111,7 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { final DatanodeInfo badNode = nodes[getErrorState().getBadNodeIndex()]; LOG.warn("Excluding datanode " + badNode); excludedNodes.put(badNode, badNode); + setPipeline(null, null, null); Review Comment: Why is this required, we are anyway throwing, is it because we called ``setPipeline(lb);`` above? why don't we call it just before the ``return`` statement? Originally we are returning this `lb` & then setting the pipeline post that, so we can do this just before ``return lb``? ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java: ########## @@ -89,6 +89,10 @@ public void testExcludedNodesForgiveness() throws IOException { conf.setLong( HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY, 2500); + // Set min replication for blocks to be written as 1. + conf.setInt( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION, + 1); Review Comment: Why are we bothering the existing tests? It should pass without any new config change, else it would be an incompatible change for the existing clients ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java: ########## @@ -2651,5 +2653,147 @@ public void testNameNodeCreateSnapshotTrashRootOnStartup() } } + @Test + public void testSingleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass( + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 2); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill one DN, so only 2 racks stays with active DN + cluster.stopDataNode(0); + // create a file with replication 3, for rack fault tolerant BPP, + // it should allocate nodes in all 3 racks. + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } + } + + @Test + public void testSingleRackFailureDuringPipelineSetupMinReplicationImpossible() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass( + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 3); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill one DN, so only 2 racks stays with active DN + cluster.stopDataNode(0); + boolean threw = false; + try { + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } catch (IOException e) { + // success + threw = true; + } + assertTrue("Failed to throw IOE when creating a file with less " + + "DNs than required for min replication", threw); + } + } + + @Test + public void testMultipleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass( + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 1); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill 2 DN, so only 1 racks stays with active DN + cluster.stopDataNode(0); + cluster.stopDataNode(1); + // create a file with replication 3, for rack fault tolerant BPP, + // it should allocate nodes in all 3 racks. + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } + } + + @Test + public void testMultipleRackFailureDuringPipelineSetupMinReplicationImpossible() + throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 2); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // kill 2 DN, so only 1 rack stays with active DN + cluster.stopDataNode(0); + cluster.stopDataNode(1); + boolean threw = false; + try { + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } catch (IOException e) { + // success + threw = true; + } + assertTrue("Failed to throw IOE when creating a file with less " + + "DNs than required for min replication", threw); + } + } + + @Test + public void testAllRackFailureDuringPipelineSetup() throws Exception { + Configuration conf = getTestConfiguration(); + conf.setClass( + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + false); + // 3 racks & 3 nodes. 1 per rack + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + // shutdown all DNs + cluster.shutdownDataNodes(); + // create a file with replication 3, for rack fault tolerant BPP, + // it should allocate nodes in all 3 rack but fail because no DNs are present. + boolean threw = false; + try { + DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L); + } catch (IOException e) { + // success + threw = true; + } Review Comment: ``` LambdaTestUtils.intercept(IOException.class, () -> DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L)); ``` ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java: ########## @@ -1817,10 +1839,10 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { nodes = lb.getLocations(); nextStorageTypes = lb.getStorageTypes(); nextStorageIDs = lb.getStorageIDs(); - + setPipeline(lb); // Connect to first DataNode in the list. success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs, - 0L, false); + 0L, false) || setupPipelineForAppendOrRecovery(); Review Comment: By this, if your first attempt to create a stream with 3 nodes fails, you immediately fallback to dropping the Datanode. Shouldn't we try to get a node replacement like before & if those attempts fail then comeback on dropping logic? So, if there is just one node problem (not a rack level), before this change the stream would have got a replacement but now it won't? If I catch it right, it is a behaviour change, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org