This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 226454dfa3758ea2bd95fbb5bab7d49ffc410d24 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Fri Nov 11 08:59:46 2022 -0800 HDDS-7478. [Ozone-Streaming] NPE in when creating a file with o3fs. (#3949) (cherry picked from commit c17be7013ade5920ef2ae056b48b8b1d8b04d136) --- .../impl/TestKeyValueStreamDataChannel.java | 2 +- .../fs/ozone/TestOzoneFileSystemWithStreaming.java | 158 +++++++++++++++++++++ .../client/rpc/TestBlockDataStreamOutput.java | 2 - .../fs/ozone/BasicOzoneClientAdapterImpl.java | 23 +-- .../ozone/BasicRootedOzoneClientAdapterImpl.java | 23 ++- 5 files changed, 174 insertions(+), 34 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java index d252b1cb1b..ddbd4b39f4 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java @@ -199,7 +199,7 @@ public class TestKeyValueStreamDataChannel { @Override public CompletableFuture<DataStreamReply> writeAsync( - ByteBuffer src, WriteOption... writeOptions) { + ByteBuffer src, Iterable<WriteOption> writeOptions) { final int written; try { written = writeBuffers( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java new file mode 100644 index 0000000000..f2aa527598 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreaming.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.ozone; + +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.TestDataUtil; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.rules.Timeout; + +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY; + +/** + * Ozone file system tests with Streaming. + */ +public class TestOzoneFileSystemWithStreaming { + @Rule + public Timeout timeout = Timeout.seconds(300); + + private static MiniOzoneCluster cluster; + private static OzoneBucket bucket; + + private final OzoneConfiguration conf = new OzoneConfiguration(); + + { + try { + init(); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + private void init() throws Exception { + final int chunkSize = 16 << 10; + final int flushSize = 2 * chunkSize; + final int maxFlushSize = 2 * flushSize; + final int blockSize = 2 * maxFlushSize; + final BucketLayout layout = BucketLayout.FILE_SYSTEM_OPTIMIZED; + + conf.setBoolean(DFS_CONTAINER_RATIS_DATASTREAM_ENABLED, true); + conf.setBoolean(OZONE_FS_DATASTREAM_ENABLED, true); + conf.setBoolean(OZONE_OM_RATIS_ENABLE_KEY, false); + conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, layout.name()); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(5) + .setTotalPipelineNumLimit(10) + .setBlockSize(blockSize) + .setChunkSize(chunkSize) + .setStreamBufferFlushSize(flushSize) + .setStreamBufferMaxSize(maxFlushSize) + .setDataStreamBufferFlushize(maxFlushSize) + .setStreamBufferSizeUnit(StorageUnit.BYTES) + .setDataStreamMinPacketSize(chunkSize) + .setDataStreamStreamWindowSize(5 * chunkSize) + .build(); + cluster.waitForClusterToBeReady(); + + // create a volume and a bucket to be used by OzoneFileSystem + bucket = TestDataUtil.createVolumeAndBucket(cluster, layout); + } + + @AfterClass + public static void teardown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testO3fsCreateFile() throws Exception { + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s.%s/", + OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName()); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + final Path file = new Path("/file"); + + try (FileSystem fs = FileSystem.get(conf)) { + runTestCreateFile(fs, file); + } + } + + @Test + public void testOfsCreateFile() throws Exception { + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s/", + OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY)); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + final String dir = OZONE_ROOT + bucket.getVolumeName() + + OZONE_URI_DELIMITER + bucket.getName(); + final Path file = new Path(dir, "file"); + + try (FileSystem fs = FileSystem.get(conf)) { + runTestCreateFile(fs, file); + } + } + + static void runTestCreateFile(FileSystem fs, Path file) throws Exception { + final byte[] bytes = new byte[1 << 20]; + ThreadLocalRandom.current().nextBytes(bytes); + + ContractTestUtils.createFile(fs, file, true, bytes); + + final byte[] buffer = new byte[4 << 10]; + int offset = 0; + try (FSDataInputStream in = fs.open(file)) { + for (; ;) { + final int n = in.read(buffer, 0, buffer.length); + if (n <= 0) { + break; + } + for (int i = 0; i < n; i++) { + Assertions.assertEquals(bytes[offset + i], buffer[i]); + } + offset += n; + } + } + Assertions.assertEquals(bytes.length, offset); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index c8a0115a80..f232a9298e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -49,7 +49,6 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; /** @@ -91,7 +90,6 @@ public class TestBlockDataStreamOutput { OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); conf.setFromObject(clientConfig); - conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java index a1584b1a95..36c6c0ffc7 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java @@ -272,22 +272,13 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter { boolean overWrite, boolean recursive) throws IOException { incrementCounter(Statistic.OBJECTS_CREATED, 1); try { - OzoneDataStreamOutput ozoneDataStreamOutput = null; - if (replication == ReplicationFactor.ONE.getValue() - || replication == ReplicationFactor.THREE.getValue()) { - - ReplicationConfig customReplicationConfig = - ReplicationConfig.adjustReplication(bucketReplicationConfig, - replication, config); - ozoneDataStreamOutput = bucket - .createStreamFile(key, 0, customReplicationConfig, overWrite, - recursive); - } else { - ozoneDataStreamOutput = bucket.createStreamFile( - key, 0, bucketReplicationConfig, overWrite, recursive); - } - return new OzoneFSDataStreamOutput( - ozoneDataStreamOutput.getByteBufStreamOutput()); + final ReplicationConfig replicationConfig + = OzoneClientUtils.resolveClientSideReplicationConfig( + replication, clientConfiguredReplicationConfig, + getReplicationConfigWithRefreshCheck(), config); + final OzoneDataStreamOutput out = bucket.createStreamFile( + key, 0, replicationConfig, overWrite, recursive); + return new OzoneFSDataStreamOutput(out.getByteBufStreamOutput()); } catch (OMException ex) { if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) { diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java index 9d1bc4980d..ac3d38b958 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java @@ -406,21 +406,14 @@ public class BasicRootedOzoneClientAdapterImpl String key = ofsPath.getKeyName(); try { // Hadoop CopyCommands class always sets recursive to true - OzoneBucket bucket = getBucket(ofsPath, recursive); - OzoneDataStreamOutput ozoneDataStreamOutput = null; - if (replication == ReplicationFactor.ONE.getValue() - || replication == ReplicationFactor.THREE.getValue()) { - - ozoneDataStreamOutput = bucket.createStreamFile(key, 0, - ReplicationConfig.adjustReplication( - clientConfiguredReplicationConfig, replication, config), - overWrite, recursive); - } else { - ozoneDataStreamOutput = bucket.createStreamFile( - key, 0, clientConfiguredReplicationConfig, overWrite, recursive); - } - return new OzoneFSDataStreamOutput( - ozoneDataStreamOutput.getByteBufStreamOutput()); + final OzoneBucket bucket = getBucket(ofsPath, recursive); + final ReplicationConfig replicationConfig + = OzoneClientUtils.resolveClientSideReplicationConfig( + replication, clientConfiguredReplicationConfig, + bucket.getReplicationConfig(), config); + final OzoneDataStreamOutput out = bucket.createStreamFile( + key, 0, replicationConfig, overWrite, recursive); + return new OzoneFSDataStreamOutput(out.getByteBufStreamOutput()); } catch (OMException ex) { if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
