This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit c5343bc73b130db1632df8b3d3c3ff7d55259283 Author: Kaijie Chen <[email protected]> AuthorDate: Tue Jul 12 18:49:09 2022 +0800 HDDS-6955. [Ozone-streaming] Add explicit stream flag in ozone shell (#3559) (cherry picked from commit 6319ce8ad60d62f66ed71702c17b09ad04da8dd7) --- .../hadoop/hdds/client/ReplicationConfig.java | 11 +++ .../hadoop/ozone/shell/keys/PutKeyHandler.java | 90 ++++++++++++++-------- 2 files changed, 67 insertions(+), 34 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java index 610419527a..7542409679 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java @@ -76,6 +76,17 @@ public interface ReplicationConfig { return parse(null, replication, config); } + static ReplicationConfig resolve(ReplicationConfig replicationConfig, + ReplicationConfig bucketReplicationConfig, ConfigurationSource conf) { + if (replicationConfig == null) { + replicationConfig = bucketReplicationConfig; + } + if (replicationConfig == null) { + replicationConfig = getDefault(conf); + } + return replicationConfig; + } + /** * Helper method to serialize from proto. * <p> diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java index 1f7c1ef7f4..16af2c5fd1 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java @@ -29,6 +29,7 @@ import java.nio.channels.FileChannel; import java.util.HashMap; import java.util.Map; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; @@ -42,13 +43,13 @@ import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.shell.OzoneAddress; import org.apache.commons.codec.digest.DigestUtils; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY; import org.apache.hadoop.ozone.shell.ShellReplicationOptions; import picocli.CommandLine.Command; import picocli.CommandLine.Mixin; +import picocli.CommandLine.Option; import picocli.CommandLine.Parameters; /** @@ -61,6 +62,9 @@ public class PutKeyHandler extends KeyHandler { @Parameters(index = "1", arity = "1..1", description = "File to upload") private String fileName; + @Option(names = "--stream") + private boolean stream; + @Mixin private ShellReplicationOptions replication; @@ -96,41 +100,59 @@ public class PutKeyHandler extends KeyHandler { int chunkSize = (int) getConf().getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES); - Boolean useAsync = false; - if (dataFile.length() <= chunkSize || - (replicationConfig != null && - replicationConfig.getReplicationType() == EC) || - bucket.getReplicationConfig() instanceof ECReplicationConfig) { - useAsync = true; - } - if (useAsync) { - if (isVerbose()) { - out().println("API: async"); - } - try (InputStream input = new FileInputStream(dataFile); - OutputStream output = bucket.createKey(keyName, dataFile.length(), - replicationConfig, keyMetadata)) { - IOUtils.copyBytes(input, output, chunkSize); - } + if (stream) { + stream(dataFile, bucket, keyName, keyMetadata, + replicationConfig, chunkSize); } else { - if (isVerbose()) { - out().println("API: streaming"); - } - try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r"); - OzoneDataStreamOutput out = bucket.createStreamKey(keyName, - dataFile.length(), replicationConfig, keyMetadata)) { - FileChannel ch = raf.getChannel(); - long len = raf.length(); - long off = 0; - while (len > 0) { - long writeLen = Math.min(len, chunkSize); - ByteBuffer bb = ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen); - out.write(bb); - off += writeLen; - len -= writeLen; - } - } + async(dataFile, bucket, keyName, keyMetadata, + replicationConfig, chunkSize); + } + } + + void async( + File dataFile, OzoneBucket bucket, + String keyName, Map<String, String> keyMetadata, + ReplicationConfig replicationConfig, int chunkSize) + throws IOException { + if (isVerbose()) { + out().println("API: async"); + } + try (InputStream input = new FileInputStream(dataFile); + OutputStream output = bucket.createKey(keyName, dataFile.length(), + replicationConfig, keyMetadata)) { + IOUtils.copyBytes(input, output, chunkSize); } } + void stream( + File dataFile, OzoneBucket bucket, + String keyName, Map<String, String> keyMetadata, + ReplicationConfig replicationConfig, int chunkSize) + throws IOException { + if (isVerbose()) { + out().println("API: streaming"); + } + // In streaming mode, always resolve replication config at client side, + // because streaming is not compatible for writing EC keys. + replicationConfig = ReplicationConfig.resolve(replicationConfig, + bucket.getReplicationConfig(), getConf()); + Preconditions.checkArgument( + !(replicationConfig instanceof ECReplicationConfig), + "Can not put EC key by streaming"); + + try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r"); + OzoneDataStreamOutput out = bucket.createStreamKey(keyName, + dataFile.length(), replicationConfig, keyMetadata)) { + FileChannel ch = raf.getChannel(); + long len = raf.length(); + long off = 0; + while (len > 0) { + long writeLen = Math.min(len, chunkSize); + ByteBuffer bb = ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen); + out.write(bb); + off += writeLen; + len -= writeLen; + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
