This is an automated email from the ASF dual-hosted git repository. shashikant pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push: new e4f23ee HDDS-3477. Disable partial chunk write during flush() call in ozone client by default. (#957) e4f23ee is described below commit e4f23ee243a12354e326a959f40aab38e48dbce0 Author: micah zhao <micahz...@tencent.com> AuthorDate: Wed Jun 3 12:26:40 2020 +0800 HDDS-3477. Disable partial chunk write during flush() call in ozone client by default. (#957) --- .../org/apache/hadoop/ozone/OzoneConfigKeys.java | 4 +- .../common/src/main/resources/ozone-default.xml | 10 +- .../apache/hadoop/ozone/client/rpc/RpcClient.java | 2 +- .../ozone/client/rpc/TestBlockOutputStream.java | 2 + ...m.java => TestBlockOutputStreamFlushDelay.java} | 62 +++--- .../rpc/TestBlockOutputStreamWithFailures.java | 2 + ...stBlockOutputStreamWithFailuresFlushDelay.java} | 33 ++- .../client/rpc/TestContainerStateMachine.java | 2 + ...va => TestContainerStateMachineFlushDelay.java} | 107 +++------- .../client/rpc/TestFailureHandlingByClient.java | 3 +- .../rpc/TestFailureHandlingByClientFlushDelay.java | 235 +++++++++++++++++++++ .../rpc/TestOzoneClientRetriesOnException.java | 2 + ...stOzoneClientRetriesOnExceptionFlushDelay.java} | 148 ++++++------- 13 files changed, 413 insertions(+), 199 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 841ffc5..281f185 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -157,8 +157,8 @@ public final class OzoneConfigKeys { * */ public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY = "ozone.client.stream.buffer.flush.delay"; - public static final boolean OOZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT = - false; + public static final boolean OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT = + true; // This defines the overall connection limit for the connection pool used in // RestClient. diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 26bf2ba..93292d8 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -390,11 +390,13 @@ </property> <property> <name>ozone.client.stream.buffer.flush.delay</name> - <value>false</value> + <value>true</value> <tag>OZONE, CLIENT</tag> - <description>If set true, when call flush() and determine whether the - data in the current buffer is greater than ozone.client.stream.buffer.size. - if greater than then send buffer to the datanode. + <description> + Default true, when call flush() and determine whether the data in the + current buffer is greater than ozone.client.stream.buffer.size, if + greater than then send buffer to the datanode. You can turn this off + by setting this configuration to false. </description> </property> <property> diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 57dfb0d..3c333bd 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -202,7 +202,7 @@ public class RpcClient implements ClientProtocol { StorageUnit.BYTES); streamBufferFlushDelay = conf.getBoolean( OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, - OzoneConfigKeys.OOZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT); + OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT); streamBufferMaxSize = (long) conf .getStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java index 338e286..44b47dc 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java @@ -48,6 +48,7 @@ import org.junit.rules.Timeout; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY; /** * Tests BlockOutputStream class. @@ -90,6 +91,7 @@ public class TestBlockOutputStream { conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); + conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(7) .setTotalPipelineNumLimit(10) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java similarity index 95% copy from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java copy to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java index 338e286..93a3ad6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java @@ -50,13 +50,13 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; /** - * Tests BlockOutputStream class. + * Tests TestBlockOutputStreamFlushDelay class. */ -public class TestBlockOutputStream { +public class TestBlockOutputStreamFlushDelay { /** - * Set a timeout for each test. - */ + * Set a timeout for each test. + */ @Rule public Timeout timeout = new Timeout(300000); private static MiniOzoneCluster cluster; @@ -140,6 +140,7 @@ public class TestBlockOutputStream { String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); int dataLength = 50; + int totalWriteDataLength = dataLength * 2; byte[] data1 = ContainerTestHelper.getFixedLengthString(keyString, dataLength) .getBytes(UTF_8); @@ -174,7 +175,9 @@ public class TestBlockOutputStream { // commitIndex2FlushedData Map will be empty here Assert.assertTrue( blockOutputStream.getCommitIndex2flushedDataMap().isEmpty()); - + // Total write data greater than or equal one chunk + // size to make sure flush will sync data. + key.write(data1); // Now do a flush. This will flush the data and update the flush length and // the map. key.flush(); @@ -190,14 +193,16 @@ public class TestBlockOutputStream { Assert.assertEquals(1, blockOutputStream.getBufferPool().getSize()); Assert.assertEquals(0, blockOutputStream.getBufferPool().getBuffer(0).position()); - Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); - Assert.assertEquals(dataLength, + Assert.assertEquals(totalWriteDataLength, + blockOutputStream.getWrittenDataLength()); + Assert.assertEquals(totalWriteDataLength, blockOutputStream.getTotalDataFlushedLength()); Assert.assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size()); // flush ensures watchForCommit updates the total length acknowledged - Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(totalWriteDataLength, + blockOutputStream.getTotalAckDataLength()); Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); // now close the stream, It will update the ack length after watchForCommit @@ -217,7 +222,8 @@ public class TestBlockOutputStream { // make sure the bufferPool is empty Assert .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); + Assert.assertEquals(totalWriteDataLength, + blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); validateData(keyName, data1); @@ -271,31 +277,31 @@ public class TestBlockOutputStream { Assert.assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size()); - // Now do a flush. This will flush the data and update the flush length and - // the map. + // Now do a flush. key.flush(); Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); - // flush is a sync call, all pending operations will complete - Assert.assertEquals(pendingWriteChunkCount, metrics + // The previously written data is equal to flushSize,so no action is + // triggered when execute flush. + Assert.assertEquals(pendingWriteChunkCount + 2, metrics .getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(pendingPutBlockCount, metrics + Assert.assertEquals(pendingPutBlockCount + 1, metrics .getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); // Since the data in the buffer is already flushed, flush here will have // no impact on the counters and data structures - Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); - Assert - .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); - Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); + // No action is triggered when execute flush and BlockOutputStream will not + // be updated. + Assert.assertEquals(dataLength, + blockOutputStream.getBufferPool().computeBufferData()); + Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); Assert.assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength()); Assert.assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size()); + Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength()); - // flush ensures watchForCommit updates the total length acknowledged - Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); // now close the stream, It will update the ack length after watchForCommit key.close(); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); @@ -650,8 +656,7 @@ public class TestBlockOutputStream { Assert.assertTrue( blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1); - // Now do a flush. This will flush the data and update the flush length and - // the map. + // Now do a flush. key.flush(); Assert.assertEquals(1, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(pendingWriteChunkCount, metrics @@ -664,10 +669,10 @@ public class TestBlockOutputStream { Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); - - Assert.assertEquals(dataLength, + // dataLength = maxFlushSize + 50, the final data length of 50 is smaller + // than the chunkSize(100) and will not be sync when called flush. + Assert.assertEquals(dataLength - 50, blockOutputStream.getTotalDataFlushedLength()); - // flush will make sure one more entry gets updated in the map Assert.assertTrue( blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2); @@ -683,9 +688,10 @@ public class TestBlockOutputStream { .getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); Assert.assertEquals(writeChunkCount + 5, metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); - Assert.assertEquals(putBlockCount + 4, + // The previous flush did not trigger any action. + Assert.assertEquals(putBlockCount + 3, metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); - Assert.assertEquals(totalOpCount + 9, + Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); @@ -694,7 +700,7 @@ public class TestBlockOutputStream { } private OzoneOutputStream createKey(String keyName, ReplicationType type, - long size) throws Exception { + long size) throws Exception { return TestHelper .createKey(keyName, type, size, objectStore, volumeName, bucketName); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index 7e327bc..e377615 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -56,6 +56,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY; /** * Tests failure detection and handling in BlockOutputStream Class. @@ -119,6 +120,7 @@ public class TestBlockOutputStreamWithFailures { "watch.request.timeout", 3, TimeUnit.SECONDS); conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 15); + conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7) .setTotalPipelineNumLimit(10).setBlockSize(blockSize) .setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java similarity index 98% copy from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java copy to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java index 7e327bc..6a849ac 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.ratis.RatisHelper; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientRatis; @@ -38,29 +39,23 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; import org.apache.ratis.protocol.GroupMismatchException; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.ratis.protocol.RaftRetryFailureException; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; +import org.junit.rules.Timeout; import java.io.IOException; import java.io.OutputStream; import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.junit.Rule; -import org.junit.rules.Timeout; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*; /** - * Tests failure detection and handling in BlockOutputStream Class. + * Tests failure detection by set flush delay and handling in + * BlockOutputStream Class. */ -public class TestBlockOutputStreamWithFailures { +public class TestBlockOutputStreamWithFailuresFlushDelay { /** * Set a timeout for each test. @@ -156,7 +151,7 @@ public class TestBlockOutputStreamWithFailures { throws Exception { String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); - int dataLength = maxFlushSize + 50; + int dataLength = maxFlushSize + chunkSize; // write data more than 1 chunk byte[] data1 = ContainerTestHelper.getFixedLengthString(keyString, dataLength) @@ -245,7 +240,7 @@ public class TestBlockOutputStreamWithFailures { public void testWatchForCommitDatanodeFailure() throws Exception { String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); - int dataLength = maxFlushSize + 50; + int dataLength = maxFlushSize + chunkSize; // write data more than 1 chunk byte[] data1 = ContainerTestHelper.getFixedLengthString(keyString, dataLength) @@ -332,7 +327,7 @@ public class TestBlockOutputStreamWithFailures { public void test2DatanodesFailure() throws Exception { String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); - int dataLength = maxFlushSize + 50; + int dataLength = maxFlushSize + chunkSize; // write data more than 1 chunk byte[] data1 = ContainerTestHelper.getFixedLengthString(keyString, dataLength) @@ -437,7 +432,7 @@ public class TestBlockOutputStreamWithFailures { public void testFailureWithPrimeSizedData() throws Exception { String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); - int dataLength = maxFlushSize + 69; + int dataLength = maxFlushSize + chunkSize; // write data more than 1 chunk byte[] data1 = ContainerTestHelper.getFixedLengthString(keyString, dataLength) @@ -573,7 +568,7 @@ public class TestBlockOutputStreamWithFailures { String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE); - int dataLength = maxFlushSize + 50; + int dataLength = maxFlushSize + chunkSize; // write data more than 1 chunk byte[] data1 = ContainerTestHelper.getFixedLengthString(keyString, dataLength) @@ -664,7 +659,7 @@ public class TestBlockOutputStreamWithFailures { String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE); - int dataLength = maxFlushSize + 50; + int dataLength = maxFlushSize + chunkSize; // write data more than 1 chunk byte[] data1 = ContainerTestHelper.getFixedLengthString(keyString, dataLength) @@ -758,7 +753,7 @@ public class TestBlockOutputStreamWithFailures { OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 3 * blockSize, ReplicationFactor.ONE); - int dataLength = maxFlushSize + 50; + int dataLength = maxFlushSize + chunkSize; // write data more than 1 chunk byte[] data1 = ContainerTestHelper.getFixedLengthString(keyString, dataLength) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java index 4207006..1972dac 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java @@ -55,6 +55,7 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY; /** * Tests the containerStateMachine failure handling. @@ -98,6 +99,7 @@ public class TestContainerStateMachine { conf.setQuietMode(false); OzoneManager.setTestSecureOmFlag(true); conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1); + conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false); // conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString()); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java similarity index 61% copy from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java copy to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java index 4207006..1208652 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.client.rpc; +import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; @@ -30,36 +31,29 @@ import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; -import org.apache.hadoop.ozone.container.TestHelper; -import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine; -import org.apache.hadoop.ozone.container.common.transport.server.ratis.RatisServerConfiguration; +import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; +import org.junit.rules.Timeout; import java.io.File; import java.io.IOException; -import java.nio.file.Path; import java.util.HashMap; import java.util.List; +import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.junit.Rule; -import org.junit.rules.Timeout; -import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; -import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.HddsConfigKeys.*; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; /** - * Tests the containerStateMachine failure handling. + * Tests the containerStateMachine failure handling by set flush delay. */ -public class TestContainerStateMachine { +public class TestContainerStateMachineFlushDelay { /** * Set a timeout for each test. @@ -74,6 +68,11 @@ public class TestContainerStateMachine { private String volumeName; private String bucketName; private String path; + private static int chunkSize; + private static int flushSize; + private static int maxFlushSize; + private static int blockSize; + private static String keyString; /** * Create a MiniDFSCluster for testing. @@ -82,8 +81,13 @@ public class TestContainerStateMachine { */ @Before public void setup() throws Exception { + chunkSize = 100; + flushSize = 2 * chunkSize; + maxFlushSize = 2 * flushSize; + blockSize = 2 * maxFlushSize; + keyString = UUID.randomUUID().toString(); path = GenericTestUtils - .getTempPath(TestContainerStateMachine.class.getSimpleName()); + .getTempPath(TestContainerStateMachineFlushDelay.class.getSimpleName()); File baseDir = new File(path); baseDir.mkdirs(); @@ -101,6 +105,11 @@ public class TestContainerStateMachine { // conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString()); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1) + .setBlockSize(blockSize) + .setChunkSize(chunkSize) + .setStreamBufferFlushSize(flushSize) + .setStreamBufferMaxSize(maxFlushSize) + .setStreamBufferSizeUnit(StorageUnit.BYTES) .setHbInterval(200) .setCertificateClient(new CertificateClientTestImpl(conf)) .build(); @@ -131,8 +140,14 @@ public class TestContainerStateMachine { objectStore.getVolume(volumeName).getBucket(bucketName) .createKey("ratis", 1024, ReplicationType.RATIS, ReplicationFactor.ONE, new HashMap<>()); + // Now ozone.client.stream.buffer.flush.delay is currently enabled + // by default. Here we written data(length 110) greater than chunk + // Size(length 100), make sure flush will sync data. + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, 110) + .getBytes(UTF_8); // First write and flush creates a container in the datanode - key.write("ratis".getBytes()); + key.write(data); key.flush(); key.write("ratis".getBytes()); @@ -162,58 +177,4 @@ public class TestContainerStateMachine { == ContainerProtos.ContainerDataProto.State.UNHEALTHY); } - @Test - public void testRatisSnapshotRetention() throws Exception { - - ContainerStateMachine stateMachine = - (ContainerStateMachine) TestHelper.getStateMachine(cluster); - SimpleStateMachineStorage storage = - (SimpleStateMachineStorage) stateMachine.getStateMachineStorage(); - Assert.assertNull(storage.findLatestSnapshot()); - - // Write 10 keys. Num snapshots should be equal to config value. - for (int i = 1; i <= 10; i++) { - OzoneOutputStream key = - objectStore.getVolume(volumeName).getBucket(bucketName) - .createKey(("ratis" + i), 1024, ReplicationType.RATIS, - ReplicationFactor.ONE, new HashMap<>()); - // First write and flush creates a container in the datanode - key.write(("ratis" + i).getBytes()); - key.flush(); - key.write(("ratis" + i).getBytes()); - key.close(); - } - - RatisServerConfiguration ratisServerConfiguration = - conf.getObject(RatisServerConfiguration.class); - - stateMachine = - (ContainerStateMachine) TestHelper.getStateMachine(cluster); - storage = (SimpleStateMachineStorage) stateMachine.getStateMachineStorage(); - Path parentPath = storage.findLatestSnapshot().getFile().getPath(); - int numSnapshots = parentPath.getParent().toFile().listFiles().length; - Assert.assertTrue(Math.abs(ratisServerConfiguration - .getNumSnapshotsRetained() - numSnapshots) <= 1); - - // Write 10 more keys. Num Snapshots should remain the same. - for (int i = 11; i <= 20; i++) { - OzoneOutputStream key = - objectStore.getVolume(volumeName).getBucket(bucketName) - .createKey(("ratis" + i), 1024, ReplicationType.RATIS, - ReplicationFactor.ONE, new HashMap<>()); - // First write and flush creates a container in the datanode - key.write(("ratis" + i).getBytes()); - key.flush(); - key.write(("ratis" + i).getBytes()); - key.close(); - } - stateMachine = - (ContainerStateMachine) TestHelper.getStateMachine(cluster); - storage = (SimpleStateMachineStorage) stateMachine.getStateMachineStorage(); - parentPath = storage.findLatestSnapshot().getFile().getPath(); - numSnapshots = parentPath.getParent().toFile().listFiles().length; - Assert.assertTrue(Math.abs(ratisServerConfiguration - .getNumSnapshotsRetained() - numSnapshots) <= 1); - } - } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java index d172dd5..2edf369 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java @@ -123,7 +123,8 @@ public class TestFailureHandlingByClient { RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." + "watch.request.timeout", 3, TimeUnit.SECONDS); - + conf.setBoolean( + OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false); conf.setQuietMode(false); conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, StaticMapping.class, DNSToSwitchMapping.class); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClientFlushDelay.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClientFlushDelay.java new file mode 100644 index 0000000..95e275b --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClientFlushDelay.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.ratis.RatisHelper; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.StaticMapping; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.TestHelper; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.junit.*; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * Tests Exception handling by Ozone Client by set flush delay. + */ +public class TestFailureHandlingByClientFlushDelay { + + /** + * Set a timeout for each test. + */ + @Rule + public Timeout timeout = new Timeout(300000); + + private MiniOzoneCluster cluster; + private OzoneConfiguration conf; + private OzoneClient client; + private ObjectStore objectStore; + private int chunkSize; + private int flushSize; + private int maxFlushSize; + private int blockSize; + private String volumeName; + private String bucketName; + private String keyString; + + /** + * Create a MiniDFSCluster for testing. + * <p> + * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + private void init() throws Exception { + conf = new OzoneConfiguration(); + chunkSize = 100; + flushSize = 2 * chunkSize; + maxFlushSize = 2 * flushSize; + blockSize = 4 * chunkSize; + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 100, TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); + conf.setTimeDuration( + OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, + 1, TimeUnit.SECONDS); + conf.setTimeDuration( + OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, + 1, TimeUnit.SECONDS); + conf.setBoolean( + OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true); + conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 2); + conf.setTimeDuration( + RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." + + DatanodeRatisServerConfig.RATIS_SERVER_REQUEST_TIMEOUT_KEY, + 3, TimeUnit.SECONDS); + conf.setTimeDuration( + RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." + + DatanodeRatisServerConfig. + RATIS_SERVER_WATCH_REQUEST_TIMEOUT_KEY, + 3, TimeUnit.SECONDS); + conf.setTimeDuration( + RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." + + "rpc.request.timeout", + 3, TimeUnit.SECONDS); + conf.setTimeDuration( + RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." + + "watch.request.timeout", + 3, TimeUnit.SECONDS); + conf.setQuietMode(false); + conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + StaticMapping.class, DNSToSwitchMapping.class); + StaticMapping.addNodeToRack(NetUtils.normalizeHostNames( + Collections.singleton(HddsUtils.getHostName(conf))).get(0), + "/rack1"); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(10) + .setTotalPipelineNumLimit(15) + .setChunkSize(chunkSize) + .setBlockSize(blockSize) + .setStreamBufferFlushSize(flushSize) + .setStreamBufferMaxSize(maxFlushSize) + .setStreamBufferSizeUnit(StorageUnit.BYTES).build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getRpcClient(conf); + objectStore = client.getObjectStore(); + keyString = UUID.randomUUID().toString(); + volumeName = "datanodefailurehandlingtest"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + private void startCluster() throws Exception { + init(); + } + + /** + * Shutdown MiniDFSCluster. + */ + @After + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testPipelineExclusionWithPipelineFailure() throws Exception { + startCluster(); + String keyName = UUID.randomUUID().toString(); + OzoneOutputStream key = + createKey(keyName, ReplicationType.RATIS, blockSize); + String data = ContainerTestHelper + .getFixedLengthString(keyString, chunkSize); + + // get the name of a valid container + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = + (KeyOutputStream) key.getOutputStream(); + List<BlockOutputStreamEntry> streamEntryList = + keyOutputStream.getStreamEntries(); + + // Assert that 1 block will be preallocated + Assert.assertEquals(1, streamEntryList.size()); + key.write(data.getBytes()); + key.flush(); + long containerId = streamEntryList.get(0).getBlockID().getContainerID(); + BlockID blockId = streamEntryList.get(0).getBlockID(); + ContainerInfo container = + cluster.getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueof(containerId)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + List<DatanodeDetails> datanodes = pipeline.getNodes(); + + // Two nodes, next write will hit AlreadyClosedException , the pipeline + // will be added in the exclude list + cluster.shutdownHddsDatanode(datanodes.get(0)); + cluster.shutdownHddsDatanode(datanodes.get(1)); + + key.write(data.getBytes()); + key.flush(); + Assert.assertTrue( + keyOutputStream.getExcludeList().getContainerIds().isEmpty()); + Assert.assertTrue( + keyOutputStream.getExcludeList().getDatanodes().isEmpty()); + Assert.assertTrue( + keyOutputStream.getExcludeList().getDatanodes().isEmpty()); + key.write(data.getBytes()); + // The close will just write to the buffer + key.close(); + + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) + .setRefreshPipeline(true) + .build(); + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + + // Make sure a new block is written + Assert.assertNotEquals( + keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly().get(0) + .getBlockID(), blockId); + Assert.assertEquals(3 * data.getBytes().length, keyInfo.getDataSize()); + validateData(keyName, data.concat(data).concat(data).getBytes()); + } + + private OzoneOutputStream createKey(String keyName, ReplicationType type, + long size) throws Exception { + return TestHelper + .createKey(keyName, type, size, objectStore, volumeName, bucketName); + } + + private void validateData(String keyName, byte[] data) throws Exception { + TestHelper + .validateData(keyName, data, objectStore, volumeName, bucketName); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java index 8f0ce00..d93d817 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java @@ -57,6 +57,7 @@ import org.junit.Rule; import org.junit.rules.Timeout; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY; /** * Tests failure detection and handling in BlockOutputStream Class. @@ -100,6 +101,7 @@ public class TestOzoneClientRetriesOnException { conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3); conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 3); conf.setQuietMode(false); + conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(7) .setTotalPipelineNumLimit(10) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2BlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java similarity index 54% rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2BlockOutputStream.java rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java index cf719b2..b9029ea 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2BlockOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java @@ -14,12 +14,19 @@ * License for the specific language governing permissions and limitations under * the License. */ - package org.apache.hadoop.ozone.client.rpc; import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -30,44 +37,42 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; +import org.apache.ratis.protocol.GroupMismatchException; +import org.junit.*; +import org.junit.rules.Timeout; import java.io.IOException; import java.io.OutputStream; import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.junit.Rule; -import org.junit.rules.Timeout; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY; /** - * Tests BlockOutputStream class. + * Tests failure detection and handling in BlockOutputStream Class by set + * flush delay. */ -public class Test2BlockOutputStream { +public class TestOzoneClientRetriesOnExceptionFlushDelay { /** - * Set a timeout for each test. - */ + * Set a timeout for each test. + */ @Rule public Timeout timeout = new Timeout(300000); + private static MiniOzoneCluster cluster; - private static OzoneConfiguration conf = new OzoneConfiguration(); - private static OzoneClient client; - private static ObjectStore objectStore; - private static int chunkSize; - private static int flushSize; - private static int maxFlushSize; - private static int blockSize; - private static String volumeName; - private static String bucketName; - private static String keyString; + private OzoneConfiguration conf = new OzoneConfiguration(); + private OzoneClient client; + private ObjectStore objectStore; + private int chunkSize; + private int flushSize; + private int maxFlushSize; + private int blockSize; + private String volumeName; + private String bucketName; + private String keyString; + private XceiverClientManager xceiverClientManager; /** * Create a MiniDFSCluster for testing. @@ -76,19 +81,18 @@ public class Test2BlockOutputStream { * * @throws IOException */ - @BeforeClass - public static void init() throws Exception { + @Before + public void init() throws Exception { chunkSize = 100; flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; - conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, + TimeUnit.MILLISECONDS); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); + conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3); + conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 3); conf.setQuietMode(false); - conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, - StorageUnit.MB); - conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, true); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(7) .setTotalPipelineNumLimit(10) @@ -102,8 +106,9 @@ public class Test2BlockOutputStream { //the easiest way to create an open container is creating a key client = OzoneClientFactory.getRpcClient(conf); objectStore = client.getObjectStore(); + xceiverClientManager = new XceiverClientManager(conf); keyString = UUID.randomUUID().toString(); - volumeName = "testblockoutputstream"; + volumeName = "testblockoutputstreamwithretries"; bucketName = volumeName; objectStore.createVolume(volumeName); objectStore.getVolume(volumeName).createBucket(bucketName); @@ -116,66 +121,67 @@ public class Test2BlockOutputStream { /** * Shutdown MiniDFSCluster. */ - @AfterClass - public static void shutdown() { + @After + public void shutdown() { if (cluster != null) { cluster.shutdown(); } } @Test - public void testFlushChunkDelay() throws Exception { - String keyName1 = getKeyName(); - OzoneOutputStream key1 = createKey(keyName1, ReplicationType.RATIS, 0); - + public void testGroupMismatchExceptionHandling() throws Exception { + String keyName = getKeyName(); + // make sure flush will sync data. + int dataLength = maxFlushSize + chunkSize; + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, + dataLength); + // write data more than 1 chunk byte[] data1 = - ContainerTestHelper.getFixedLengthString(keyString, 10) + ContainerTestHelper.getFixedLengthString(keyString, dataLength) .getBytes(UTF_8); - key1.write(data1); - key1.flush(); - KeyOutputStream keyOutputStream = (KeyOutputStream)key1.getOutputStream(); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream(); + long containerID = + keyOutputStream.getStreamEntries().get(0). + getBlockID().getContainerID(); Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); + ContainerInfo container = + cluster.getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueof(containerID)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + XceiverClientSpi xceiverClient = + xceiverClientManager.acquireClient(pipeline); + xceiverClient.sendCommand(ContainerTestHelper + .getCreateContainerRequest(containerID, pipeline)); + xceiverClientManager.releaseClient(xceiverClient, false); + key.write(data1); OutputStream stream = keyOutputStream.getStreamEntries().get(0) .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); BlockOutputStream blockOutputStream = (BlockOutputStream) stream; - - // we have just written data(length 10) less than chunk Size, - // at this time we call flush will not sync data. - Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength()); - key1.close(); - validateData(keyName1, data1); - - String keyName2 = getKeyName(); - OzoneOutputStream key2 = createKey(keyName2, ReplicationType.RATIS, 0); - byte[] data2 = - ContainerTestHelper.getFixedLengthString(keyString, 110) - .getBytes(UTF_8); - key2.write(data2); - key2.flush(); - keyOutputStream = (KeyOutputStream)key2.getOutputStream(); - Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1); - stream = keyOutputStream.getStreamEntries().get(0) - .getOutputStream(); - Assert.assertTrue(stream instanceof BlockOutputStream); - blockOutputStream = (BlockOutputStream) stream; - - // we have just written data(length 110) greater than chunk Size, - // at this time we call flush will sync data. - Assert.assertEquals(data2.length, - blockOutputStream.getTotalDataFlushedLength()); - key2.close(); - validateData(keyName2, data2); + TestHelper.waitForPipelineClose(key, cluster, false); + key.flush(); + Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream + .getIoException()) instanceof GroupMismatchException); + Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds() + .contains(pipeline.getId())); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2); + key.close(); + Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0); + validateData(keyName, data1); } private OzoneOutputStream createKey(String keyName, ReplicationType type, - long size) throws Exception { + long size) throws Exception { return TestHelper - .createKey(keyName, type, size, objectStore, volumeName, bucketName); + .createKey(keyName, type, ReplicationFactor.ONE, + size, objectStore, volumeName, bucketName); } + private void validateData(String keyName, byte[] data) throws Exception { TestHelper .validateData(keyName, data, objectStore, volumeName, bucketName); } - } --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org