This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a23991237 HDDS-10435. Support S3 object tags for existing requests 
(#6607)
8a23991237 is described below

commit 8a239912378f8b2aa5de84d9f913821096b74fcb
Author: Ivan Andika <[email protected]>
AuthorDate: Wed May 22 14:42:32 2024 +0800

    HDDS-10435. Support S3 object tags for existing requests (#6607)
---
 .../apache/hadoop/ozone/OzoneManagerVersion.java   |   2 +
 .../apache/hadoop/ozone/client/OzoneBucket.java    |  85 +++++++-
 .../org/apache/hadoop/ozone/client/OzoneKey.java   |  25 ++-
 .../hadoop/ozone/client/OzoneKeyDetails.java       |   4 +-
 .../ozone/client/protocol/ClientProtocol.java      |  52 ++++-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  56 ++++-
 .../apache/hadoop/ozone/om/helpers/OmKeyArgs.java  |  20 +-
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |  37 +++-
 .../apache/hadoop/ozone/om/helpers/WithTags.java   |  30 +++
 ...OzoneManagerProtocolClientSideTranslatorPB.java |   7 +-
 .../src/main/smoketest/s3/MultipartUpload.robot    |  14 +-
 .../dist/src/main/smoketest/s3/objectcopy.robot    |  21 +-
 .../dist/src/main/smoketest/s3/objectputget.robot  |  32 ++-
 .../client/rpc/TestOzoneRpcClientAbstract.java     | 107 +++++++++-
 .../src/main/proto/OmClientProtocol.proto          |   4 +
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |   8 +
 .../S3InitiateMultipartUploadRequest.java          |   1 +
 .../S3InitiateMultipartUploadRequestWithFSO.java   |   1 +
 .../S3MultipartUploadCompleteRequest.java          |   6 +-
 .../ozone/om/request/OMRequestTestUtils.java       |   7 +-
 .../om/request/key/TestOMKeyCreateRequest.java     |  48 ++++-
 .../TestS3InitiateMultipartUploadRequest.java      |   9 +-
 ...estS3InitiateMultipartUploadRequestWithFSO.java |   9 +-
 .../s3/multipart/TestS3MultipartRequest.java       |  43 +++-
 .../TestS3MultipartUploadCompleteRequest.java      |  20 +-
 .../hadoop/ozone/s3/endpoint/EndpointBase.java     |  84 ++++++++
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   |  50 ++++-
 .../ozone/s3/endpoint/ObjectEndpointStreaming.java |   5 +-
 .../hadoop/ozone/s3/exception/S3ErrorTable.java    |   6 +
 .../org/apache/hadoop/ozone/s3/util/S3Consts.java  |  12 ++
 .../hadoop/ozone/client/ClientProtocolStub.java    |  26 +++
 .../hadoop/ozone/client/OzoneBucketStub.java       |  34 ++-
 .../hadoop/ozone/s3/endpoint/TestObjectGet.java    |  75 +++++--
 .../hadoop/ozone/s3/endpoint/TestObjectPut.java    | 229 ++++++++++++++++++++-
 .../ozone/s3/endpoint/TestPermissionCheck.java     |   4 +-
 35 files changed, 1071 insertions(+), 102 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
index 985c238fd7..c55945d537 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
@@ -40,6 +40,8 @@ public enum OzoneManagerVersion implements ComponentVersion {
   LIGHTWEIGHT_LIST_KEYS(4, "OzoneManager version that supports lightweight"
       + " listKeys API."),
 
+  OBJECT_TAG(5, "OzoneManager version that supports object tags"),
+
   FUTURE_VERSION(-1, "Used internally in the client when the server side is "
       + " newer and an unknown server version has arrived to the client.");
 
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index 8d153a948c..6972831477 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -54,7 +54,6 @@ import java.io.IOException;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -431,7 +430,7 @@ public class OzoneBucket extends WithMetadata {
   public OzoneOutputStream createKey(String key, long size)
       throws IOException {
     return createKey(key, size, defaultReplication,
-        new HashMap<>());
+        Collections.emptyMap());
   }
 
   /**
@@ -459,6 +458,7 @@ public class OzoneBucket extends WithMetadata {
    * @param key               Name of the key to be created.
    * @param size              Size of the data the key will point to.
    * @param replicationConfig Replication configuration.
+   * @param keyMetadata       Custom key metadata.
    * @return OzoneOutputStream to which the data has to be written.
    * @throws IOException
    */
@@ -466,8 +466,27 @@ public class OzoneBucket extends WithMetadata {
       ReplicationConfig replicationConfig,
       Map<String, String> keyMetadata)
       throws IOException {
+    return this.createKey(key, size, replicationConfig, keyMetadata, 
Collections.emptyMap());
+  }
+
+  /**
+   * Creates a new key in the bucket.
+   *
+   * @param key               Name of the key to be created.
+   * @param size              Size of the data the key will point to.
+   * @param replicationConfig Replication configuration.
+   * @param keyMetadata       Custom key metadata.
+   * @param tags              Tags used for S3 object tags
+   * @return OzoneOutputStream to which the data has to be written.
+   * @throws IOException
+   */
+  public OzoneOutputStream createKey(String key, long size,
+      ReplicationConfig replicationConfig,
+      Map<String, String> keyMetadata,
+      Map<String, String> tags)
+      throws IOException {
     return proxy
-        .createKey(volumeName, name, key, size, replicationConfig, 
keyMetadata);
+        .createKey(volumeName, name, key, size, replicationConfig, 
keyMetadata, tags);
   }
 
   /**
@@ -491,6 +510,7 @@ public class OzoneBucket extends WithMetadata {
    * @param key               Name of the key to be created.
    * @param size              Size of the data the key will point to.
    * @param replicationConfig Replication configuration.
+   * @param keyMetadata       Custom key metadata.
    * @return OzoneDataStreamOutput to which the data has to be written.
    * @throws IOException
    */
@@ -500,8 +520,28 @@ public class OzoneBucket extends WithMetadata {
     if (replicationConfig == null) {
       replicationConfig = defaultReplication;
     }
+    return this.createStreamKey(key, size, replicationConfig, keyMetadata,
+        Collections.emptyMap());
+  }
+
+  /**
+   * Creates a new key in the bucket.
+   *
+   * @param key               Name of the key to be created.
+   * @param size              Size of the data the key will point to.
+   * @param replicationConfig Replication configuration.
+   * @param keyMetadata       Custom key metadata.
+   * @return OzoneDataStreamOutput to which the data has to be written.
+   * @throws IOException
+   */
+  public OzoneDataStreamOutput createStreamKey(String key, long size,
+      ReplicationConfig replicationConfig, Map<String, String> keyMetadata,
+      Map<String, String> tags) throws IOException {
+    if (replicationConfig == null) {
+      replicationConfig = defaultReplication;
+    }
     return proxy.createStreamKey(volumeName, name, key, size,
-        replicationConfig, keyMetadata);
+        replicationConfig, keyMetadata, tags);
   }
 
   /**
@@ -659,11 +699,12 @@ public class OzoneBucket extends WithMetadata {
 
   /**
    * Initiate multipart upload for a specified key.
-   * @param keyName
-   * @param type
-   * @param factor
+   * @param keyName Name of the key to be created when the multipart upload is 
completed.
+   * @param type Replication type to be used.
+   * @param factor Replication factor of the key.
    * @return OmMultipartInfo
    * @throws IOException
+   * @deprecated Use {@link OzoneBucket#initiateMultipartUpload(String, 
ReplicationConfig)} instead.
    */
   @Deprecated
   public OmMultipartInfo initiateMultipartUpload(String keyName,
@@ -676,6 +717,10 @@ public class OzoneBucket extends WithMetadata {
 
   /**
    * Initiate multipart upload for a specified key.
+   * @param keyName Name of the key to be created when the multipart upload is 
completed.
+   * @param config Replication config.
+   * @return OmMultipartInfo
+   * @throws IOException
    */
   public OmMultipartInfo initiateMultipartUpload(String keyName,
       ReplicationConfig config)
@@ -685,11 +730,32 @@ public class OzoneBucket extends WithMetadata {
 
   /**
    * Initiate multipart upload for a specified key.
+   * @param keyName Name of the key to be created when the multipart upload is 
completed.
+   * @param config Replication config.
+   * @param metadata Custom key metadata.
+   * @return OmMultipartInfo
+   * @throws IOException
    */
   public OmMultipartInfo initiateMultipartUpload(String keyName,
       ReplicationConfig config, Map<String, String> metadata)
       throws IOException {
-    return proxy.initiateMultipartUpload(volumeName, name, keyName, config, 
metadata);
+    return initiateMultipartUpload(keyName, config, metadata, 
Collections.emptyMap());
+  }
+
+  /**
+   * Initiate multipart upload for a specified key.
+   * @param keyName Name of the key to be created when the multipart upload is 
completed.
+   * @param config Replication config.
+   * @param metadata Custom key metadata.
+   * @param tags Tags used for S3 object tags.
+   * @return OmMultipartInfo
+   * @throws IOException
+   */
+  public OmMultipartInfo initiateMultipartUpload(String keyName,
+      ReplicationConfig config, Map<String, String> metadata,
+      Map<String, String> tags)
+      throws IOException {
+    return proxy.initiateMultipartUpload(volumeName, name, keyName, config, 
metadata, tags);
   }
 
   /**
@@ -1311,7 +1377,8 @@ public class OzoneBucket extends WithMetadata {
         keyInfo.getReplicationConfig(),
         metadata,
         keyInfo.isFile(),
-        keyInfo.getOwnerName());
+        keyInfo.getOwnerName(),
+        Collections.emptyMap());
   }
 
 
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
index 3663f6f654..fdd89fe819 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
@@ -63,7 +63,9 @@ public class OzoneKey {
 
   private ReplicationConfig replicationConfig;
 
-  private Map<String, String> metadata = new HashMap<>();
+  private final Map<String, String> metadata = new HashMap<>();
+
+  private final Map<String, String> tags = new HashMap<>();
 
   /**
    * Indicator if key is a file.
@@ -94,10 +96,12 @@ public class OzoneKey {
   public OzoneKey(String volumeName, String bucketName,
                   String keyName, long size, long creationTime,
                   long modificationTime, ReplicationConfig replicationConfig,
-                  Map<String, String> metadata, boolean isFile, String owner) {
+                  Map<String, String> metadata, boolean isFile, String owner,
+                  Map<String, String> tags) {
     this(volumeName, bucketName, keyName, size, creationTime,
         modificationTime, replicationConfig, isFile, owner);
     this.metadata.putAll(metadata);
+    this.tags.putAll(tags);
   }
 
   /**
@@ -163,10 +167,24 @@ public class OzoneKey {
     return modificationTime;
   }
 
+  /**
+   * Returns the metadata of the key.
+   *
+   * @return key metadata.
+   */
   public Map<String, String> getMetadata() {
     return metadata;
   }
 
+  /**
+   * Returns the tags of the key.
+   *
+   * @return key tags.
+   */
+  public Map<String, String> getTags() {
+    return tags;
+  }
+
   public void setMetadata(Map<String, String> metadata) {
     this.metadata.putAll(metadata);
   }
@@ -205,7 +223,8 @@ public class OzoneKey {
     return new OzoneKey(keyInfo.getVolumeName(), keyInfo.getBucketName(),
         keyInfo.getKeyName(), keyInfo.getDataSize(), keyInfo.getCreationTime(),
         keyInfo.getModificationTime(), keyInfo.getReplicationConfig(),
-        keyInfo.getMetadata(), keyInfo.isFile(), keyInfo.getOwnerName());
+        keyInfo.getMetadata(), keyInfo.isFile(), keyInfo.getOwnerName(),
+        keyInfo.getTags());
   }
 
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
index 6b44fa1dca..168e15d9bd 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
@@ -53,9 +53,9 @@ public class OzoneKeyDetails extends OzoneKey {
       Map<String, String> metadata,
       FileEncryptionInfo feInfo,
       CheckedSupplier<OzoneInputStream, IOException> contentSupplier,
-      boolean isFile, String owner) {
+      boolean isFile, String owner, Map<String, String> tags) {
     super(volumeName, bucketName, keyName, size, creationTime,
-        modificationTime, replicationConfig, metadata, isFile, owner);
+        modificationTime, replicationConfig, metadata, isFile, owner, tags);
     this.ozoneKeyLocations = ozoneKeyLocations;
     this.feInfo = feInfo;
     this.contentSupplier = contentSupplier;
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 912a3138c4..f7b84e487d 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -329,7 +329,7 @@ public interface ClientProtocol {
    * @param size Size of the data
    * @param metadata Custom key value metadata
    * @return {@link OzoneOutputStream}
-   *
+   * @deprecated Use {@link ClientProtocol#createKey(String, String, String, 
long, ReplicationConfig, Map)} instead.
    */
   @Deprecated
   OzoneOutputStream createKey(String volumeName, String bucketName,
@@ -344,7 +344,7 @@ public interface ClientProtocol {
    * @param bucketName Name of the Bucket
    * @param keyName Name of the Key
    * @param size Size of the data
-   * @param metadata custom key value metadata
+   * @param metadata Custom key value metadata
    * @return {@link OzoneOutputStream}
    *
    */
@@ -353,6 +353,22 @@ public interface ClientProtocol {
       Map<String, String> metadata)
       throws IOException;
 
+  /**
+   * Writes a key in an existing bucket.
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param keyName Name of the Key
+   * @param size Size of the data
+   * @param metadata Custom key value metadata
+   * @param tags Tags used for S3 object tags
+   * @return {@link OzoneOutputStream}
+   *
+   */
+  OzoneOutputStream createKey(String volumeName, String bucketName,
+      String keyName, long size, ReplicationConfig replicationConfig,
+      Map<String, String> metadata, Map<String, String> tags)
+      throws IOException;
+
   /**
    * Writes a key in an existing bucket.
    * @param volumeName Name of the Volume
@@ -368,6 +384,22 @@ public interface ClientProtocol {
       Map<String, String> metadata)
       throws IOException;
 
+  /**
+   * Writes a key in an existing bucket.
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param keyName Name of the Key
+   * @param size Size of the data
+   * @param metadata custom key value metadata
+   * @param tags Tags used for S3 object tags
+   * @return {@link OzoneDataStreamOutput}
+   *
+   */
+  OzoneDataStreamOutput createStreamKey(String volumeName, String bucketName,
+      String keyName, long size, ReplicationConfig replicationConfig,
+      Map<String, String> metadata, Map<String, String> tags)
+      throws IOException;
+
   /**
    * Reads a key from an existing bucket.
    * @param volumeName Name of the Volume
@@ -535,6 +567,22 @@ public interface ClientProtocol {
       Map<String, String> metadata)
       throws IOException;
 
+  /**
+   * Initiate Multipart upload.
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param keyName Name of the Key
+   * @param replicationConfig Replication config
+   * @param metadata Custom key value metadata
+   * @param tags Tags used for S3 object tags
+   * @return {@link OmMultipartInfo}
+   * @throws IOException
+   */
+  OmMultipartInfo initiateMultipartUpload(String volumeName, String
+      bucketName, String keyName, ReplicationConfig replicationConfig,
+      Map<String, String> metadata, Map<String, String> tags)
+      throws IOException;
+
   /**
    * Create a part key for a multipart upload key.
    * @param volumeName
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 2f97f2f3cc..42b53e0d23 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1387,6 +1387,15 @@ public class RpcClient implements ClientProtocol {
       ReplicationConfig replicationConfig,
       Map<String, String> metadata)
       throws IOException {
+    return createKey(volumeName, bucketName, keyName, size, replicationConfig,
+        metadata, Collections.emptyMap());
+  }
+
+  @Override
+  public OzoneOutputStream createKey(
+      String volumeName, String bucketName, String keyName, long size,
+      ReplicationConfig replicationConfig,
+      Map<String, String> metadata, Map<String, String> tags) throws 
IOException {
     verifyVolumeName(volumeName);
     verifyBucketName(bucketName);
     if (checkKeyNameEnabled) {
@@ -1404,6 +1413,12 @@ public class RpcClient implements ClientProtocol {
       }
     }
 
+    if (omVersion.compareTo(OzoneManagerVersion.OBJECT_TAG) < 0) {
+      if (tags != null && !tags.isEmpty()) {
+        throw new IOException("OzoneManager does not support object tags");
+      }
+    }
+
     if (replicationConfig != null) {
       replicationConfigValidator.validate(replicationConfig);
     }
@@ -1416,6 +1431,7 @@ public class RpcClient implements ClientProtocol {
         .setDataSize(size)
         .setReplicationConfig(replicationConfig)
         .addAllMetadataGdpr(metadata)
+        .addAllTags(tags)
         .setAcls(getAclList())
         .setLatestVersionLocation(getLatestVersionLocation)
         .setOwnerName(ownerName);
@@ -1437,12 +1453,28 @@ public class RpcClient implements ClientProtocol {
       ReplicationConfig replicationConfig,
       Map<String, String> metadata)
       throws IOException {
+    return createStreamKey(volumeName, bucketName, keyName, size, 
replicationConfig,
+        metadata, Collections.emptyMap());
+  }
+
+  @Override
+  public OzoneDataStreamOutput createStreamKey(
+      String volumeName, String bucketName, String keyName, long size,
+      ReplicationConfig replicationConfig,
+      Map<String, String> metadata, Map<String, String> tags) throws 
IOException {
     verifyVolumeName(volumeName);
     verifyBucketName(bucketName);
     if (checkKeyNameEnabled) {
       HddsClientUtils.verifyKeyName(keyName);
     }
     HddsClientUtils.checkNotNull(keyName);
+
+    if (omVersion.compareTo(OzoneManagerVersion.OBJECT_TAG) < 0) {
+      if (tags != null && !tags.isEmpty()) {
+        throw new IOException("OzoneManager does not support object tags");
+      }
+    }
+
     String ownerName = getRealUserInfo().getShortUserName();
 
     OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
@@ -1452,6 +1484,7 @@ public class RpcClient implements ClientProtocol {
         .setDataSize(size)
         .setReplicationConfig(replicationConfig)
         .addAllMetadataGdpr(metadata)
+        .addAllTags(tags)
         .setSortDatanodesInPipeline(true)
         .setAcls(getAclList())
         .setOwnerName(ownerName);
@@ -1722,7 +1755,8 @@ public class RpcClient implements ClientProtocol {
         keyInfo.getModificationTime(), ozoneKeyLocations,
         keyInfo.getReplicationConfig(), keyInfo.getMetadata(),
         keyInfo.getFileEncryptionInfo(),
-        () -> getInputStreamWithRetryFunction(keyInfo), keyInfo.isFile(), 
keyInfo.getOwnerName());
+        () -> getInputStreamWithRetryFunction(keyInfo), keyInfo.isFile(), 
+        keyInfo.getOwnerName(), keyInfo.getTags());
   }
 
   @Override
@@ -1839,6 +1873,18 @@ public class RpcClient implements ClientProtocol {
       ReplicationConfig replicationConfig,
       Map<String, String> metadata)
       throws IOException {
+    return initiateMultipartUpload(volumeName, bucketName, keyName, 
replicationConfig,
+        metadata, Collections.emptyMap());
+  }
+
+  @Override
+  public OmMultipartInfo initiateMultipartUpload(String volumeName,
+      String bucketName,
+      String keyName,
+      ReplicationConfig replicationConfig,
+      Map<String, String> metadata,
+      Map<String, String> tags)
+      throws IOException {
     verifyVolumeName(volumeName);
     verifyBucketName(bucketName);
     HddsClientUtils.checkNotNull(keyName);
@@ -1852,6 +1898,13 @@ public class RpcClient implements ClientProtocol {
             + " Erasure Coded replication.");
       }
     }
+
+    if (omVersion.compareTo(OzoneManagerVersion.OBJECT_TAG) < 0) {
+      if (tags != null && !tags.isEmpty()) {
+        throw new IOException("OzoneManager does not support object tags");
+      }
+    }
+
     OmKeyArgs keyArgs = new OmKeyArgs.Builder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
@@ -1860,6 +1913,7 @@ public class RpcClient implements ClientProtocol {
         .setAcls(getAclList())
         .addAllMetadataGdpr(metadata)
         .setOwnerName(ownerName)
+        .addAllTags(tags)
         .build();
     OmMultipartInfo multipartInfo = ozoneManagerClient
         .initiateMultipartUpload(keyArgs);
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
index e8ad2564f3..19d5ab4fa7 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
@@ -53,6 +53,7 @@ public final class OmKeyArgs implements Auditable {
   private final boolean recursive;
   private final boolean headOp;
   private final boolean forceUpdateContainerCacheFromSCM;
+  private final Map<String, String> tags;
 
   private OmKeyArgs(Builder b) {
     this.volumeName = b.volumeName;
@@ -72,6 +73,7 @@ public final class OmKeyArgs implements Auditable {
     this.headOp = b.headOp;
     this.forceUpdateContainerCacheFromSCM = b.forceUpdateContainerCacheFromSCM;
     this.ownerName = b.ownerName;
+    this.tags = b.tags;
   }
 
   public boolean getIsMultipartKey() {
@@ -150,6 +152,10 @@ public final class OmKeyArgs implements Auditable {
     return forceUpdateContainerCacheFromSCM;
   }
 
+  public Map<String, String> getTags() {
+    return tags;
+  }
+
   @Override
   public Map<String, String> toAuditMap() {
     Map<String, String> auditMap = new LinkedHashMap<>();
@@ -189,7 +195,8 @@ public final class OmKeyArgs implements Auditable {
         .setHeadOp(headOp)
         .setLatestVersionLocation(latestVersionLocation)
         .setAcls(acls)
-        .setForceUpdateContainerCacheFromSCM(forceUpdateContainerCacheFromSCM);
+        .setForceUpdateContainerCacheFromSCM(forceUpdateContainerCacheFromSCM)
+        .addAllTags(tags);
   }
 
   @Nonnull
@@ -228,6 +235,7 @@ public final class OmKeyArgs implements Auditable {
     private boolean recursive;
     private boolean headOp;
     private boolean forceUpdateContainerCacheFromSCM;
+    private final Map<String, String> tags = new HashMap<>();
 
     public Builder setVolumeName(String volume) {
       this.volumeName = volume;
@@ -302,6 +310,16 @@ public final class OmKeyArgs implements Auditable {
       return this;
     }
 
+    public Builder addTag(String key, String value) {
+      this.tags.put(key, value);
+      return this;
+    }
+
+    public Builder addAllTags(Map<String, String> tagmap) {
+      this.tags.putAll(tagmap);
+      return this;
+    }
+
     public Builder setSortDatanodesInPipeline(boolean sort) {
       this.sortDatanodesInPipeline = sort;
       return this;
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index bf31be67c5..c8e7f8f609 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -55,7 +55,7 @@ import org.slf4j.LoggerFactory;
  * datanode. Also, this is the metadata written to om.db on server side.
  */
 public final class OmKeyInfo extends WithParentObjectId
-    implements CopyObject<OmKeyInfo> {
+    implements CopyObject<OmKeyInfo>, WithTags {
   private static final Logger LOG = LoggerFactory.getLogger(OmKeyInfo.class);
 
   private static final Codec<OmKeyInfo> CODEC_TRUE = newCodec(true);
@@ -102,6 +102,11 @@ public final class OmKeyInfo extends WithParentObjectId
    */
   private final CopyOnWriteArrayList<OzoneAcl> acls;
 
+  /**
+   * Used for S3 tags.
+   */
+  private Map<String, String> tags;
+
   private OmKeyInfo(Builder b) {
     super(b);
     this.volumeName = b.volumeName;
@@ -118,6 +123,7 @@ public final class OmKeyInfo extends WithParentObjectId
     this.fileName = b.fileName;
     this.isFile = b.isFile;
     this.ownerName = b.ownerName;
+    this.tags = b.tags;
   }
 
   public String getVolumeName() {
@@ -190,6 +196,16 @@ public final class OmKeyInfo extends WithParentObjectId
     return getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID);
   }
 
+  @Override
+  public Map<String, String> getTags() {
+    return tags;
+  }
+
+  @Override
+  public void setTags(Map<String, String> tags) {
+    this.tags = tags;
+  }
+
   /**
    * updates the length of the each block in the list given.
    * This will be called when the key is being committed to OzoneManager.
@@ -435,6 +451,7 @@ public final class OmKeyInfo extends WithParentObjectId
     private FileChecksum fileChecksum;
 
     private boolean isFile;
+    private final Map<String, String> tags = new HashMap<>();
 
     public Builder() {
     }
@@ -563,6 +580,16 @@ public final class OmKeyInfo extends WithParentObjectId
       return this;
     }
 
+    public Builder addTag(String key, String value) {
+      tags.put(key, value);
+      return this;
+    }
+
+    public Builder addAllTags(Map<String, String> keyTags) {
+      tags.putAll(keyTags);
+      return this;
+    }
+
     public OmKeyInfo build() {
       return new OmKeyInfo(this);
     }
@@ -649,6 +676,7 @@ public final class OmKeyInfo extends WithParentObjectId
         .setCreationTime(creationTime)
         .setModificationTime(modificationTime)
         .addAllMetadata(KeyValueUtil.toProtobuf(getMetadata()))
+        .addAllTags(KeyValueUtil.toProtobuf(getTags()))
         .addAllAcls(OzoneAclUtil.toProtobuf(acls))
         .setObjectID(getObjectID())
         .setUpdateID(getUpdateID())
@@ -696,6 +724,7 @@ public final class OmKeyInfo extends WithParentObjectId
             .fromProto(keyInfo.getType(), keyInfo.getFactor(),
                 keyInfo.getEcReplicationConfig()))
         
.addAllMetadata(KeyValueUtil.getFromProtobuf(keyInfo.getMetadataList()))
+        .addAllTags(KeyValueUtil.getFromProtobuf(keyInfo.getTagsList()))
         .setFileEncryptionInfo(keyInfo.hasFileEncryptionInfo() ?
             OMPBHelper.convert(keyInfo.getFileEncryptionInfo()) : null)
         .setAcls(OzoneAclUtil.fromProtobuf(keyInfo.getAclsList()));
@@ -824,7 +853,11 @@ public final class OmKeyInfo extends WithParentObjectId
                 keyLocationVersion.isMultipartKey())));
 
     if (getMetadata() != null) {
-      getMetadata().forEach((k, v) -> builder.addMetadata(k, v));
+      getMetadata().forEach(builder::addMetadata);
+    }
+
+    if (getTags() != null) {
+      getTags().forEach(builder::addTag);
     }
 
     if (fileChecksum != null) {
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/WithTags.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/WithTags.java
new file mode 100644
index 0000000000..b7f9713ee3
--- /dev/null
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/WithTags.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.helpers;
+
+import java.util.Map;
+
+/**
+ * Interface to handle S3 object / bucket tags.
+ */
+public interface WithTags {
+
+  Map<String, String> getTags();
+
+  void setTags(Map<String, String> tags);
+}
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 61dd3f5660..87e5079f1d 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -714,6 +714,10 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
       keyArgs.addAllMetadata(KeyValueUtil.toProtobuf(args.getMetadata()));
     }
 
+    if (args.getTags() != null && args.getTags().size() > 0) {
+      keyArgs.addAllTags(KeyValueUtil.toProtobuf(args.getTags()));
+    }
+
     if (args.getMultipartUploadID() != null) {
       keyArgs.setMultipartUploadID(args.getMultipartUploadID());
     }
@@ -1600,7 +1604,8 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
         .addAllMetadata(KeyValueUtil.toProtobuf(omKeyArgs.getMetadata()))
         .setOwnerName(omKeyArgs.getOwner())
         .addAllAcls(omKeyArgs.getAcls().stream().map(a ->
-            OzoneAcl.toProtobuf(a)).collect(Collectors.toList()));
+            OzoneAcl.toProtobuf(a)).collect(Collectors.toList()))
+        .addAllTags(KeyValueUtil.toProtobuf(omKeyArgs.getTags()));
 
     setReplicationConfig(omKeyArgs.getReplicationConfig(), keyArgs);
 
diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot 
b/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot
index 96feec2f81..a874ba6007 100644
--- a/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot
@@ -87,7 +87,7 @@ Test Multipart Upload
 
 
 Test Multipart Upload Complete
-    ${result} =         Execute AWSS3APICli     create-multipart-upload 
--bucket ${BUCKET} --key ${PREFIX}/multipartKey1 
--metadata="custom-key1=custom-value1,custom-key2=custom-value2,gdprEnabled=true"
+    ${result} =         Execute AWSS3APICli     create-multipart-upload 
--bucket ${BUCKET} --key ${PREFIX}/multipartKey1 
--metadata="custom-key1=custom-value1,custom-key2=custom-value2,gdprEnabled=true"
 --tagging="tag-key1=tag-value1&tag-key2=tag-value2"
     ${uploadID} =       Execute and checkrc     echo '${result}' | jq -r 
'.UploadId'    0
                         Should contain          ${result}    ${BUCKET}
                         Should contain          ${result}    
${PREFIX}/multipartKey
@@ -126,15 +126,25 @@ Test Multipart Upload Complete
                                 Should contain                ${result}    
\"custom-key1\" : \"custom-value1\"
                                 Should contain                ${result}    
\"custom-key2\" : \"custom-value2\"
                                 Should not contain            ${result}    
\"gdprEnabled\": \"true\"
+                                Should contain                ${result}    
\"tag-key1\" : \"tag-value1\"
+                                Should contain                ${result}    
\"tag-key2\" : \"tag-value2\"
 
-#read file and check the key
+#read file and check the key and tag count
     ${result} =                 Execute AWSS3ApiCli        get-object --bucket 
${BUCKET} --key ${PREFIX}/multipartKey1 /tmp/${PREFIX}-multipartKey1.result
+                                Should contain             ${result}      
TagCount
+
+    ${tagCount} =               Execute and checkrc        echo '${result}' | 
jq -r '.TagCount'    0
+                                Should Be Equal            ${tagCount}    2
+
                                 Execute                    cat /tmp/part1 
/tmp/part2 > /tmp/${PREFIX}-multipartKey1
     Compare files               /tmp/${PREFIX}-multipartKey1         
/tmp/${PREFIX}-multipartKey1.result
 
     ${result} =                 Execute AWSS3ApiCli        get-object --bucket 
${BUCKET} --key ${PREFIX}/multipartKey1 --part-number 1 
/tmp/${PREFIX}-multipartKey1-part1.result
     Compare files               /tmp/part1        
/tmp/${PREFIX}-multipartKey1-part1.result
 
+    ${tagCount} =               Execute and checkrc        echo '${result}' | 
jq -r '.TagCount'    0
+                                Should Be Equal            ${tagCount}    2
+
     ${result} =                 Execute AWSS3ApiCli        get-object --bucket 
${BUCKET} --key ${PREFIX}/multipartKey1 --part-number 2 
/tmp/${PREFIX}-multipartKey1-part2.result
     Compare files               /tmp/part2        
/tmp/${PREFIX}-multipartKey1-part2.result
 
diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/objectcopy.robot 
b/hadoop-ozone/dist/src/main/smoketest/s3/objectcopy.robot
index e2bca772bc..b12199e300 100644
--- a/hadoop-ozone/dist/src/main/smoketest/s3/objectcopy.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/s3/objectcopy.robot
@@ -39,14 +39,14 @@ Copy Object Happy Scenario
                         Execute                    date > /tmp/copyfile
     ${file_checksum} =  Execute                    md5sum /tmp/copyfile | awk 
'{print $1}'
 
-    ${result} =         Execute AWSS3ApiCli        put-object --bucket 
${BUCKET} --key ${PREFIX}/copyobject/key=value/f1 --body /tmp/copyfile 
--metadata="custom-key1=custom-value1,custom-key2=custom-value2,gdprEnabled=true"
+    ${result} =         Execute AWSS3ApiCli        put-object --bucket 
${BUCKET} --key ${PREFIX}/copyobject/key=value/f1 --body /tmp/copyfile 
--metadata="custom-key1=custom-value1,custom-key2=custom-value2,gdprEnabled=true"
 --tagging="tag-key1=tag-value1&tag-key2=tag-value2"
     ${eTag} =           Execute and checkrc        echo '${result}' | jq -r 
'.ETag'  0
                         Should Be Equal            ${eTag}           
\"${file_checksum}\"
 
     ${result} =         Execute AWSS3ApiCli        list-objects --bucket 
${BUCKET} --prefix ${PREFIX}/copyobject/key=value/
                         Should contain             ${result}         f1
 
-    ${result} =         Execute AWSS3ApiCli        copy-object --bucket 
${DESTBUCKET} --key ${PREFIX}/copyobject/key=value/f1 --copy-source 
${BUCKET}/${PREFIX}/copyobject/key=value/f1 
--metadata="custom-key3=custom-value3,custom-key4=custom-value4"
+    ${result} =         Execute AWSS3ApiCli        copy-object --bucket 
${DESTBUCKET} --key ${PREFIX}/copyobject/key=value/f1 --copy-source 
${BUCKET}/${PREFIX}/copyobject/key=value/f1 
--metadata="custom-key3=custom-value3,custom-key4=custom-value4" 
--tagging="tag-key3=tag-value3"
     ${eTag} =           Execute and checkrc        echo '${result}' | jq -r 
'.CopyObjectResult.ETag'  0
                         Should Be Equal            ${eTag}           
\"${file_checksum}\"
 
@@ -54,16 +54,22 @@ Copy Object Happy Scenario
                         Should contain             ${result}         f1
 
     #check that the custom metadata of the source key has been copied to the 
destination key (default copy directive is COPY)
-    ${result} =         Execute AWSS3ApiCli        head-object --bucket 
${BUCKET} --key ${PREFIX}/copyobject/key=value/f1
+    ${result} =         Execute AWSS3ApiCli        head-object --bucket 
${DESTBUCKET} --key ${PREFIX}/copyobject/key=value/f1
                         Should contain             ${result}    
\"custom-key1\": \"custom-value1\"
                         Should contain             ${result}    
\"custom-key2\": \"custom-value2\"
                         # COPY directive ignores any metadata specified in the 
copy object request
                         Should Not contain         ${result}    
\"custom-key3\": \"custom-value3\"
                         Should Not contain         ${result}    
\"custom-key4\": \"custom-value4\"
 
+    #check that the tagging count is accurate
+    ${result} =         Execute AWSS3APICli       get-object --bucket 
${DESTBUCKET} --key ${PREFIX}/copyobject/key=value/f1 /tmp/testfile2.result
+                        Should contain            ${result}   TagCount
+    ${tagCount} =       Execute and checkrc       echo '${result}' | jq -r 
'.TagCount'    0
+                        Should Be Equal           ${tagCount}    2
+
     #copying again will not throw error
     #also uses the REPLACE copy directive
-    ${result} =         Execute AWSS3ApiCli        copy-object --bucket 
${DESTBUCKET} --key ${PREFIX}/copyobject/key=value/f1 --copy-source 
${BUCKET}/${PREFIX}/copyobject/key=value/f1 
--metadata="custom-key3=custom-value3,custom-key4=custom-value4" 
--metadata-directive REPLACE
+    ${result} =         Execute AWSS3ApiCli        copy-object --bucket 
${DESTBUCKET} --key ${PREFIX}/copyobject/key=value/f1 --copy-source 
${BUCKET}/${PREFIX}/copyobject/key=value/f1 
--metadata="custom-key3=custom-value3,custom-key4=custom-value4" 
--metadata-directive REPLACE --tagging="tag-key3=tag-value3" 
--tagging-directive REPLACE
     ${eTag} =           Execute and checkrc        echo '${result}' | jq -r 
'.CopyObjectResult.ETag'  0
                         Should Be Equal            ${eTag}           
\"${file_checksum}\"
 
@@ -75,6 +81,11 @@ Copy Object Happy Scenario
                         # REPLACE directive uses the custom metadata specified 
in the request instead of the source key's custom metadata
                         Should Not contain         ${result}    
\"custom-key1\": \"custom-value1\"
                         Should Not contain         ${result}    
\"custom-key2\": \"custom-value2\"
+    ${result} =         Execute AWSS3APICli        get-object --bucket 
${DESTBUCKET} --key ${PREFIX}/copyobject/key=value/f1 /tmp/testfile2.result
+                        Should contain             ${result}     TagCount
+                        # REPLACE directive uses the tagging header specified 
in the request instead of the source key's tags
+    ${tagCount} =       Execute and checkrc        echo '${result}' | jq -r 
'.TagCount'    0
+                        Should Be Equal            ${tagCount}    1
 
 Copy Object Where Bucket is not available
     ${result} =         Execute AWSS3APICli and checkrc        copy-object 
--bucket dfdfdfdfdfnonexistent --key ${PREFIX}/copyobject/key=value/f1 
--copy-source ${BUCKET}/${PREFIX}/copyobject/key=value/f1      255
@@ -96,6 +107,8 @@ Copy Object Where Key not available
 Copy Object using an invalid copy directive
     ${result} =         Execute AWSS3ApiCli and checkrc        copy-object 
--bucket ${DESTBUCKET} --key ${PREFIX}/copyobject/key=value/f1 --copy-source 
${BUCKET}/${PREFIX}/copyobject/key=value/f1 --metadata-directive INVALID       
255
                         Should contain             ${result}        
InvalidArgument
+    ${result} =         Execute AWSS3ApiCli and checkrc        copy-object 
--bucket ${DESTBUCKET} --key ${PREFIX}/copyobject/key=value/f1 --copy-source 
${BUCKET}/${PREFIX}/copyobject/key=value/f1 --tagging-directive INVALID        
255
+                        Should contain             ${result}        
InvalidArgument
 
 Copy Object with user defined metadata size larger than 2 KB
                                 Execute                    echo "Randomtext" > 
/tmp/testfile2
diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/objectputget.robot 
b/hadoop-ozone/dist/src/main/smoketest/s3/objectputget.robot
index bbff89e71f..4e725b036e 100644
--- a/hadoop-ozone/dist/src/main/smoketest/s3/objectputget.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/s3/objectputget.robot
@@ -162,9 +162,9 @@ Zero byte file
     ${result} =                 Execute AWSS3APICli and checkrc     get-object 
--bucket ${BUCKET} --key ${PREFIX}/putobject/key=value/zerobyte --range 
bytes=0-10000 /tmp/testfile2.result   255
                                 Should contain              ${result}       
InvalidRange
 
-Create file with user defined metadata
+Create file with user defined metadata and tags
                                 Execute                   echo "Randomtext" > 
/tmp/testfile2
-                                Execute AWSS3ApiCli       put-object --bucket 
${BUCKET} --key ${PREFIX}/putobject/custom-metadata/key1 --body /tmp/testfile2 
--metadata="custom-key1=custom-value1,custom-key2=custom-value2"
+                                Execute AWSS3ApiCli       put-object --bucket 
${BUCKET} --key ${PREFIX}/putobject/custom-metadata/key1 --body /tmp/testfile2 
--metadata="custom-key1=custom-value1,custom-key2=custom-value2" 
--tagging="tag-key1=tag-value1&tag-key2=tag-value2"
 
     ${result} =                 Execute AWSS3APICli       head-object --bucket 
${BUCKET} --key ${PREFIX}/putobject/custom-metadata/key1
                                 Should contain            ${result}    
\"custom-key1\": \"custom-value1\"
@@ -173,6 +173,13 @@ Create file with user defined metadata
     ${result} =                 Execute                   ozone sh key info 
/s3v/${BUCKET}/${PREFIX}/putobject/custom-metadata/key1
                                 Should contain            ${result}   
\"custom-key1\" : \"custom-value1\"
                                 Should contain            ${result}   
\"custom-key2\" : \"custom-value2\"
+                                Should contain            ${result}   
\"tag-key1\" : \"tag-value1\"
+                                Should contain            ${result}   
\"tag-key2\" : \"tag-value2\"
+
+    ${result} =                 Execute AWSS3APICli       get-object --bucket 
${BUCKET} --key ${PREFIX}/putobject/custom-metadata/key1 /tmp/testfile2.result
+                                Should contain            ${result}   TagCount
+    ${tagCount} =               Execute and checkrc       echo '${result}' | 
jq -r '.TagCount'    0
+                                Should Be Equal           ${tagCount}    2
 
 Create file with user defined metadata with gdpr enabled value in request
                                 Execute                    echo "Randomtext" > 
/tmp/testfile2
@@ -189,6 +196,27 @@ Create file with user defined metadata size larger than 2 
KB
                                 Should contain                        
${result}   MetadataTooLarge
                                 Should not contain                    
${result}   custom-key1: ${custom_metadata_value}
 
+Create files invalid tags
+    ${result} =                 Execute AWSS3APICli and checkrc       
put-object --bucket ${BUCKET} --key ${PREFIX}/putobject/custom-metadata/key2 
--body /tmp/testfile2 --tagging="tag-key1=tag-value1&tag-key1=tag-value2"    255
+                                Should contain                        
${result}   InvalidTag
+    ${long_tag_key} =           Execute                               printf 
'v%.0s' {1..129}
+    ${result} =                 Execute AWSS3APICli and checkrc       
put-object --bucket ${BUCKET} --key ${PREFIX}/putobject/custom-metadata/key2 
--body /tmp/testfile2 --tagging="${long_tag_key}=tag-value1"    255
+                                Should contain                        
${result}   InvalidTag
+    ${long_tag_value} =         Execute                               printf 
'v%.0s' {1..257}
+    ${result} =                 Execute AWSS3APICli and checkrc       
put-object --bucket ${BUCKET} --key ${PREFIX}/putobject/custom-metadata/key2 
--body /tmp/testfile2 --tagging="tag-key1=${long_tag_value}"    255
+                                Should contain                        
${result}   InvalidTag
+
+Create files with too many tags
+                                Execute                    echo "Randomtext" > 
/tmp/testfile2
+    @{tags_list} =              Create List
+    FOR    ${i}    IN RANGE     11
+        Append To List    ${tags_list}    tag-key-${i}=tag-value-${i}
+    END
+
+    ${tags_over_limit} =        Catenate    SEPARATOR=&    @{tags_list}
+    ${result} =                 Execute AWSS3APICli and checkrc       
put-object --bucket ${BUCKET} --key ${PREFIX}/putobject/custom-metadata/key2 
--body /tmp/testfile2 --tagging="${tags_over_limit}"    255
+                                Should contain                        
${result}   InvalidTag
+
 Create small file and expect ETag (MD5) in a reponse header
                                 Execute                    head -c 1MB 
</dev/urandom > /tmp/small_file
     ${file_md5_checksum} =      Execute                    md5sum 
/tmp/small_file | awk '{print $1}'
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 0244e5607e..d96d8d0cae 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -1512,8 +1512,17 @@ public abstract class TestOzoneRpcClientAbstract {
                         ReplicationFactor replication, String value,
                         int valueLength)
       throws IOException {
-    OzoneOutputStream out = bucket.createKey(keyName, valueLength, RATIS,
-        replication, new HashMap<>());
+    writeKey(bucket, keyName, replication, value, valueLength,
+        Collections.emptyMap(), Collections.emptyMap());
+  }
+
+  private void writeKey(OzoneBucket bucket, String keyName,
+                        ReplicationFactor replication, String value,
+                        int valueLength, Map<String, String> customMetadata,
+                        Map<String, String> tags)
+      throws IOException {
+    OzoneOutputStream out = bucket.createKey(keyName, valueLength,
+        ReplicationConfig.fromTypeAndFactor(RATIS, replication), 
customMetadata, tags);
     out.write(value.getBytes(UTF_8));
     out.close();
   }
@@ -2570,6 +2579,46 @@ public abstract class TestOzoneRpcClientAbstract {
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("bucketLayouts")
+  public void testCreateKeyWithMetadataAndTags(BucketLayout bucketLayout) 
throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+    String value = "sample value";
+    OzoneVolume volume = null;
+    store.createVolume(volumeName);
+
+    volume = store.getVolume(volumeName);
+    BucketArgs bucketArgs =
+            BucketArgs.newBuilder().setBucketLayout(bucketLayout).build();
+    volume.createBucket(bucketName, bucketArgs);
+
+    OzoneBucket ozoneBucket = volume.getBucket(bucketName);
+
+    Map<String, String> customMetadata = new HashMap<>();
+    customMetadata.put("custom-key1", "custom-value1");
+    customMetadata.put("custom-key2", "custom-value2");
+
+    Map<String, String> tags = new HashMap<>();
+    tags.put("tag-key1", "tag-value1");
+    tags.put("tag-key2", "tag-value2");
+
+    writeKey(ozoneBucket, keyName, ONE, value, value.length(), customMetadata, 
tags);
+
+    OzoneKeyDetails keyDetails = ozoneBucket.getKey(keyName);
+
+    Map<String, String> keyMetadata = keyDetails.getMetadata();
+
+    Map<String, String> keyTags = keyDetails.getTags();
+
+    assertThat(keyMetadata).containsAllEntriesOf(customMetadata);
+    assertThat(keyMetadata).doesNotContainKeys("tag-key1", "tag-key2");
+
+    assertThat(keyTags).containsAllEntriesOf(keyTags);
+    assertThat(keyTags).doesNotContainKeys("custom-key1", "custom-key2");
+  }
+
   static Stream<ReplicationConfig> replicationConfigs() {
     return Stream.of(
         RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
@@ -3119,7 +3168,41 @@ public abstract class TestOzoneRpcClientAbstract {
     customMetadata.put("custom-key1", "custom-value1");
     customMetadata.put("custom-key2", "custom-value2");
 
-    doMultipartUpload(bucket, keyName, (byte) 98, replication, customMetadata);
+    doMultipartUpload(bucket, keyName, (byte) 98, replication, customMetadata, 
Collections.emptyMap());
+  }
+
+  @ParameterizedTest
+  @MethodSource({"replicationConfigs"})
+  public void testMultipartUploadWithTags(ReplicationConfig replication) 
throws Exception {
+    testMultipartUploadWithTags(replication, BucketLayout.OBJECT_STORE);
+  }
+
+  @ParameterizedTest
+  @MethodSource({"bucketLayouts"})
+  public void testMultipartUploadWithTags(BucketLayout bucketLayout) throws 
Exception {
+    
testMultipartUploadWithTags(RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
 bucketLayout);
+  }
+
+  private void testMultipartUploadWithTags(ReplicationConfig replication, 
BucketLayout bucketLayout)
+      throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+
+    BucketArgs bucketArgs =
+        BucketArgs.newBuilder().setBucketLayout(bucketLayout).build();
+    volume.createBucket(bucketName, bucketArgs);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    // Create tags
+    Map<String, String> tags = new HashMap<>();
+    tags.put("tag-key1", "tag-value1");
+    tags.put("tag-key2", "tag-value2");
+
+    doMultipartUpload(bucket, keyName, (byte) 96, replication, 
Collections.emptyMap(), tags);
   }
 
   @Test
@@ -3755,14 +3838,14 @@ public abstract class TestOzoneRpcClientAbstract {
   private void doMultipartUpload(OzoneBucket bucket, String keyName, byte val,
       ReplicationConfig replication)
       throws Exception {
-    doMultipartUpload(bucket, keyName, val, replication, 
Collections.emptyMap());
+    doMultipartUpload(bucket, keyName, val, replication, 
Collections.emptyMap(), Collections.emptyMap());
   }
 
   private void doMultipartUpload(OzoneBucket bucket, String keyName, byte val,
-      ReplicationConfig replication, Map<String, String> customMetadata)
+      ReplicationConfig replication, Map<String, String> customMetadata, 
Map<String, String> tags)
       throws Exception {
     // Initiate Multipart upload request
-    String uploadID = initiateMultipartUpload(bucket, keyName, replication, 
customMetadata);
+    String uploadID = initiateMultipartUpload(bucket, keyName, replication, 
customMetadata, tags);
 
     // Upload parts
     Map<Integer, String> partsMap = new TreeMap<>();
@@ -3835,17 +3918,23 @@ public abstract class TestOzoneRpcClientAbstract {
     if (customMetadata != null && !customMetadata.isEmpty()) {
       assertThat(keyMetadata).containsAllEntriesOf(customMetadata);
     }
+
+    Map<String, String> keyTags = omKeyInfo.getTags();
+    if (keyTags != null && !keyTags.isEmpty()) {
+      assertThat(keyTags).containsAllEntriesOf(tags);
+    }
   }
 
   private String initiateMultipartUpload(OzoneBucket bucket, String keyName,
       ReplicationConfig replicationConfig) throws Exception {
-    return initiateMultipartUpload(bucket, keyName, replicationConfig, 
Collections.emptyMap());
+    return initiateMultipartUpload(bucket, keyName, replicationConfig, 
Collections.emptyMap(), Collections.emptyMap());
   }
 
   private String initiateMultipartUpload(OzoneBucket bucket, String keyName,
-      ReplicationConfig replicationConfig, Map<String, String> customMetadata) 
throws Exception {
+      ReplicationConfig replicationConfig, Map<String, String> customMetadata,
+      Map<String, String> tags) throws Exception {
     OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
-        replicationConfig, customMetadata);
+        replicationConfig, customMetadata, tags);
 
     String uploadID = multipartInfo.getUploadID();
     assertNotNull(uploadID);
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 4a18f308c9..e4559ad735 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1026,6 +1026,9 @@ message KeyArgs {
     // Force OM to update container cache location from SCL
     optional bool forceUpdateContainerCacheFromSCM = 20;
     optional string ownerName = 21;
+
+    // S3 object tags support
+    repeated hadoop.hdds.KeyValue tags = 22;
 }
 
 message KeyLocation {
@@ -1109,6 +1112,7 @@ message KeyInfo {
     optional FileChecksumProto fileChecksum = 18;
     optional bool isFile = 19;
     optional string ownerName = 20;
+    repeated hadoop.hdds.KeyValue tags = 21;
 }
 
 message BasicKeyInfo {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 1addd2431b..50bb1053be 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -780,6 +780,12 @@ public abstract class OMKeyRequest extends OMClientRequest 
{
       dbKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
           keyArgs.getMetadataList()));
 
+      // Construct a new tags from KeyArgs
+      // Clear the old one when the key is overwritten
+      dbKeyInfo.getTags().clear();
+      dbKeyInfo.getTags().putAll(KeyValueUtil.getFromProtobuf(
+          keyArgs.getTagsList()));
+
       dbKeyInfo.setFileEncryptionInfo(encInfo);
       return dbKeyInfo;
     }
@@ -821,6 +827,8 @@ public abstract class OMKeyRequest extends OMClientRequest {
                 keyArgs, omBucketInfo, omPathInfo, prefixManager))
             .addAllMetadata(KeyValueUtil.getFromProtobuf(
                     keyArgs.getMetadataList()))
+            .addAllTags(KeyValueUtil.getFromProtobuf(
+                    keyArgs.getTagsList()))
             .setUpdateID(transactionLogIndex)
             .setOwnerName(keyArgs.getOwnerName())
             .setFile(true);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
index e5f8264471..f16ef9f8f4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
@@ -216,6 +216,7 @@ public class S3InitiateMultipartUploadRequest extends 
OMKeyRequest {
               OMPBHelper.convert(keyArgs.getFileEncryptionInfo()) : null)
           
.addAllMetadata(KeyValueUtil.getFromProtobuf(keyArgs.getMetadataList()))
           .setOwnerName(keyArgs.getOwnerName())
+          .addAllTags(KeyValueUtil.getFromProtobuf(keyArgs.getTagsList()))
           .build();
 
       // Add to cache
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
index 062af1214d..d942cb8a2b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
@@ -192,6 +192,7 @@ public class S3InitiateMultipartUploadRequestWithFSO
               OMPBHelper.convert(keyArgs.getFileEncryptionInfo()) : null)
           .setParentObjectID(pathInfoFSO.getLastKnownParentId())
           
.addAllMetadata(KeyValueUtil.getFromProtobuf(keyArgs.getMetadataList()))
+          .addAllTags(KeyValueUtil.getFromProtobuf(keyArgs.getTagsList()))
           .build();
       
       // validate and update namespace for missing parent directory
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
index 8d9406ab0e..1dab110e78 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
@@ -495,7 +495,8 @@ public class S3MultipartUploadCompleteRequest extends 
OMKeyRequest {
           .addAllMetadata(dbOpenKeyInfo.getMetadata())
           .addMetadata(OzoneConsts.ETAG,
               multipartUploadedKeyHash(partKeyInfoMap))
-          .setOwnerName(keyArgs.getOwnerName());
+          .setOwnerName(keyArgs.getOwnerName())
+          .addAllTags(dbOpenKeyInfo.getTags());
       // Check if db entry has ObjectID. This check is required because
       // it is possible that between multipart key uploads and complete,
       // we had an upgrade.
@@ -529,6 +530,9 @@ public class S3MultipartUploadCompleteRequest extends 
OMKeyRequest {
       }
       omKeyInfo.getMetadata().put(OzoneConsts.ETAG,
           multipartUploadedKeyHash(partKeyInfoMap));
+      if (dbOpenKeyInfo.getTags() != null) {
+        omKeyInfo.setTags(dbOpenKeyInfo.getTags());
+      }
     }
     omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
     return omKeyInfo;
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
index 8103f6616c..5fb5b9dca3 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
@@ -993,7 +993,8 @@ public final class OMRequestTestUtils {
    */
   public static OMRequest createInitiateMPURequest(String volumeName,
       String bucketName, String keyName) {
-    return createInitiateMPURequest(volumeName, bucketName, keyName, 
Collections.emptyMap());
+    return createInitiateMPURequest(volumeName, bucketName, keyName, 
Collections.emptyMap(),
+        Collections.emptyMap());
   }
 
   /**
@@ -1004,7 +1005,8 @@ public final class OMRequestTestUtils {
    * @param metadata
    */
   public static OMRequest createInitiateMPURequest(String volumeName,
-      String bucketName, String keyName, Map<String, String> metadata) {
+      String bucketName, String keyName, Map<String, String> metadata,
+      Map<String, String> tags) {
     MultipartInfoInitiateRequest
         multipartInfoInitiateRequest =
         MultipartInfoInitiateRequest.newBuilder().setKeyArgs(
@@ -1013,6 +1015,7 @@ public final class OMRequestTestUtils {
                 .setKeyName(keyName)
                 .setBucketName(bucketName)
                 .addAllMetadata(KeyValueUtil.toProtobuf(metadata))
+                .addAllTags(KeyValueUtil.toProtobuf(tags))
             )
             .build();
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
index 0790e2af3b..166edb552c 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.ozone.om.PrefixManager;
 import org.apache.hadoop.ozone.om.PrefixManagerImpl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -142,8 +143,12 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
     when(ozoneManager.getOzoneLockProvider()).thenReturn(
         new OzoneLockProvider(setKeyPathLock, setFileSystemPaths));
 
+    Map<String, String> tags = new HashMap<>();
+    tags.put("tag-key1", "tag-value1");
+    tags.put("tag-key2", "tag-value2");
+
     OMRequest modifiedOmRequest =
-        doPreExecute(createKeyRequest(false, 0));
+        doPreExecute(createKeyRequest(false, 0, Collections.emptyMap(), tags));
 
     OMKeyCreateRequest omKeyCreateRequest =
         getOMKeyCreateRequest(modifiedOmRequest);
@@ -175,10 +180,10 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
         .getCreateKeyResponse().getKeyInfo().getKeyLocationListCount());
 
     // Disk should have 1 version, as it is fresh key create.
-    assertEquals(1,
-        omMetadataManager.getOpenKeyTable(
-                omKeyCreateRequest.getBucketLayout())
-            .get(openKey).getKeyLocationVersions().size());
+    OmKeyInfo openKeyInfo = 
omMetadataManager.getOpenKeyTable(omKeyCreateRequest.getBucketLayout()).get(openKey);
+
+    assertEquals(1, openKeyInfo.getKeyLocationVersions().size());
+    assertThat(openKeyInfo.getTags()).containsAllEntriesOf(tags);
 
     // Write to DB like key commit.
     omMetadataManager.getKeyTable(omKeyCreateRequest.getBucketLayout())
@@ -186,9 +191,13 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
             .getOpenKeyTable(omKeyCreateRequest.getBucketLayout())
             .get(openKey));
 
+    tags.remove("tag-key1");
+    tags.remove("tag-key2");
+    tags.put("tag-key3", "tag-value3");
+
     // Override same key again
     modifiedOmRequest =
-        doPreExecute(createKeyRequest(false, 0));
+        doPreExecute(createKeyRequest(false, 0, Collections.emptyMap(), tags));
 
     id = modifiedOmRequest.getCreateKeyRequest().getClientID();
     openKey = getOpenKey(id);
@@ -218,6 +227,11 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
         omMetadataManager.getOpenKeyTable(
                 omKeyCreateRequest.getBucketLayout())
             .get(openKey).getKeyLocationVersions().size());
+    openKeyInfo = 
omMetadataManager.getOpenKeyTable(omKeyCreateRequest.getBucketLayout()).get(openKey);
+
+    assertEquals(1, openKeyInfo.getKeyLocationVersions().size());
+    assertThat(openKeyInfo.getTags()).containsAllEntriesOf(tags);
+    assertThat(openKeyInfo.getTags()).doesNotContainKeys("tag-key1", 
"tag-key2");
 
   }
 
@@ -643,7 +657,12 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
 
   @SuppressWarnings("parameterNumber")
   protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber) 
{
-    return createKeyRequest(isMultipartKey, partNumber, keyName);
+    return createKeyRequest(isMultipartKey, partNumber, 
Collections.emptyMap(), Collections.emptyMap());
+  }
+
+  protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
+                                       Map<String, String> metadata, 
Map<String, String> tags) {
+    return createKeyRequest(isMultipartKey, partNumber, keyName, metadata, 
tags);
   }
 
   private OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
@@ -651,6 +670,12 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
     return createKeyRequest(isMultipartKey, partNumber, keyName, null);
   }
 
+  protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
+                                       String keyName,
+                                       Map<String, String> metadata) {
+    return createKeyRequest(isMultipartKey, partNumber, keyName, metadata, 
null);
+  }
+
   /**
    * Create OMRequest which encapsulates a CreateKeyRequest, optionally
    * with metadata.
@@ -661,11 +686,14 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
    * @param keyName        The name of the key to create or update.
    * @param metadata       Optional metadata for the key. Pass null or an empty
    *                       map if no metadata is to be set.
+   * @param tags           Optional tags for the key. Pass null or an empty
+   *                       map if no tags is to be set.
    * @return OMRequest configured with the provided parameters.
    */
   protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
                                        String keyName,
-                                       Map<String, String> metadata) {
+                                       Map<String, String> metadata,
+                                       Map<String, String> tags) {
     KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
@@ -689,6 +717,10 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
           .build()));
     }
 
+    if (tags != null && !tags.isEmpty()) {
+      keyArgs.addAllTags(KeyValueUtil.toProtobuf(tags));
+    }
+
     OzoneManagerProtocolProtos.CreateKeyRequest createKeyRequest =
         CreateKeyRequest.newBuilder().setKeyArgs(keyArgs).build();
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java
index 0165716231..30b76801d9 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java
@@ -68,8 +68,12 @@ public class TestS3InitiateMultipartUploadRequest
     customMetadata.put("custom-key1", "custom-value1");
     customMetadata.put("custom-key2", "custom-value2");
 
+    Map<String, String> tags = new HashMap<>();
+    tags.put("tag-key1", "tag-value1");
+    tags.put("tag-key2", "tag-value2");
+
     OMRequest modifiedRequest = doPreExecuteInitiateMPU(volumeName,
-        bucketName, keyName, customMetadata);
+        bucketName, keyName, customMetadata, tags);
 
     S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
         getS3InitiateMultipartUploadReq(modifiedRequest);
@@ -93,6 +97,9 @@ public class TestS3InitiateMultipartUploadRequest
     assertNotNull(openMPUKeyInfo.getMetadata());
     assertEquals("custom-value1", 
openMPUKeyInfo.getMetadata().get("custom-key1"));
     assertEquals("custom-value2", 
openMPUKeyInfo.getMetadata().get("custom-key2"));
+    assertNotNull(openMPUKeyInfo.getTags());
+    assertEquals("tag-value1", openMPUKeyInfo.getTags().get("tag-key1"));
+    assertEquals("tag-value2", openMPUKeyInfo.getTags().get("tag-key2"));
 
     assertNotNull(omMetadataManager.getMultipartInfoTable().get(multipartKey));
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequestWithFSO.java
index dd8eb00edb..743cd36e15 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequestWithFSO.java
@@ -68,11 +68,15 @@ public class TestS3InitiateMultipartUploadRequestWithFSO
     customMetadata.put("custom-key1", "custom-value1");
     customMetadata.put("custom-key2", "custom-value2");
 
+    Map<String, String> tags = new HashMap<>();
+    tags.put("tag-key1", "tag-value1");
+    tags.put("tag-key2", "tag-value2");
+
     final long volumeId = omMetadataManager.getVolumeId(volumeName);
     final long bucketId = omMetadataManager.getBucketId(volumeName,
             bucketName);
     OMRequest modifiedRequest = doPreExecuteInitiateMPUWithFSO(volumeName,
-        bucketName, keyName, customMetadata);
+        bucketName, keyName, customMetadata, tags);
 
     S3InitiateMultipartUploadRequest s3InitiateMultipartUploadReqFSO =
         getS3InitiateMultipartUploadReq(modifiedRequest);
@@ -111,6 +115,9 @@ public class TestS3InitiateMultipartUploadRequestWithFSO
     assertNotNull(omKeyInfo.getMetadata());
     assertEquals("custom-value1", omKeyInfo.getMetadata().get("custom-key1"));
     assertEquals("custom-value2", omKeyInfo.getMetadata().get("custom-key2"));
+    assertNotNull(omKeyInfo.getTags());
+    assertEquals("tag-value1", omKeyInfo.getTags().get("tag-key1"));
+    assertEquals("tag-value2", omKeyInfo.getTags().get("tag-key2"));
 
     OmMultipartKeyInfo omMultipartKeyInfo = omMetadataManager
             .getMultipartInfoTable().get(multipartFileKey);
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
index 1972fee69b..bd93fe176e 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
@@ -148,9 +148,26 @@ public class TestS3MultipartRequest {
   protected OMRequest doPreExecuteInitiateMPU(
       String volumeName, String bucketName, String keyName,
       Map<String, String> metadata) throws Exception {
+    return doPreExecuteInitiateMPU(volumeName, bucketName, keyName, metadata,
+        Collections.emptyMap());
+  }
+
+  /**
+   * Perform preExecute of Initiate Multipart upload request for given
+   * volume, bucket and key name.
+   * @param volumeName
+   * @param bucketName
+   * @param keyName
+   * @param metadata
+   * @param tags
+   * @return OMRequest - returned from preExecute.
+   */
+  protected OMRequest doPreExecuteInitiateMPU(
+      String volumeName, String bucketName, String keyName,
+      Map<String, String> metadata, Map<String, String> tags) throws Exception 
{
     OMRequest omRequest =
         OMRequestTestUtils.createInitiateMPURequest(volumeName, bucketName,
-            keyName, metadata);
+            keyName, metadata, tags);
 
     S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
         getS3InitiateMultipartUploadReq(omRequest);
@@ -173,6 +190,14 @@ public class TestS3MultipartRequest {
       assertThat(modifiedKeyMetadata).containsAllEntriesOf(metadata);
     }
 
+    if (tags != null) {
+      Map<String, String> modifiedKeyTags = KeyValueUtil.getFromProtobuf(
+          modifiedRequest.getInitiateMultiPartUploadRequest()
+              .getKeyArgs().getTagsList());
+
+      assertThat(modifiedKeyTags).containsAllEntriesOf(tags);
+    }
+
     return modifiedRequest;
   }
 
@@ -273,7 +298,8 @@ public class TestS3MultipartRequest {
    */
   protected OMRequest doPreExecuteInitiateMPUWithFSO(
       String volumeName, String bucketName, String keyName) throws Exception {
-    return doPreExecuteInitiateMPUWithFSO(volumeName, bucketName, keyName, 
Collections.emptyMap());
+    return doPreExecuteInitiateMPUWithFSO(volumeName, bucketName, keyName,
+        Collections.emptyMap(), Collections.emptyMap());
   }
 
   /**
@@ -283,14 +309,15 @@ public class TestS3MultipartRequest {
    * @param bucketName
    * @param keyName
    * @param metadata
+   * @param tags
    * @return OMRequest - returned from preExecute.
    */
   protected OMRequest doPreExecuteInitiateMPUWithFSO(
       String volumeName, String bucketName, String keyName,
-      Map<String, String> metadata) throws Exception {
+      Map<String, String> metadata, Map<String, String> tags) throws Exception 
{
     OMRequest omRequest =
         OMRequestTestUtils.createInitiateMPURequest(volumeName, bucketName,
-            keyName, metadata);
+            keyName, metadata, tags);
 
     S3InitiateMultipartUploadRequestWithFSO
         s3InitiateMultipartUploadRequestWithFSO =
@@ -314,6 +341,14 @@ public class TestS3MultipartRequest {
       assertThat(modifiedKeyMetadata).containsAllEntriesOf(metadata);
     }
 
+    if (tags != null) {
+      Map<String, String> modifiedKeyTags = KeyValueUtil.getFromProtobuf(
+          modifiedRequest.getInitiateMultiPartUploadRequest()
+              .getKeyArgs().getTagsList());
+
+      assertThat(modifiedKeyTags).containsAllEntriesOf(tags);
+    }
+
     return modifiedRequest;
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java
index 663f2925cb..db77d29ee7 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java
@@ -78,17 +78,26 @@ public class TestS3MultipartUploadCompleteRequest
     customMetadata.put("custom-key1", "custom-value1");
     customMetadata.put("custom-key2", "custom-value2");
 
+    Map<String, String> tags = new HashMap<>();
+    tags.put("tag-key1", "tag-value1");
+    tags.put("tag-key2", "tag-value2");
+
+
     String uploadId = checkValidateAndUpdateCacheSuccess(
-        volumeName, bucketName, keyName, customMetadata);
+        volumeName, bucketName, keyName, customMetadata, tags);
     checkDeleteTableCount(volumeName, bucketName, keyName, 0, uploadId);
 
     customMetadata.remove("custom-key1");
     customMetadata.remove("custom-key2");
     customMetadata.put("custom-key3", "custom-value3");
 
+    tags.remove("tag-key1");
+    tags.remove("tag-key2");
+    tags.put("tag-key3", "tag-value3");
+
     // Do it twice to test overwrite
     uploadId = checkValidateAndUpdateCacheSuccess(volumeName, bucketName,
-        keyName, customMetadata);
+        keyName, customMetadata, tags);
     // After overwrite, one entry must be in delete table
     checkDeleteTableCount(volumeName, bucketName, keyName, 1, uploadId);
   }
@@ -116,10 +125,10 @@ public class TestS3MultipartUploadCompleteRequest
   }
 
   private String checkValidateAndUpdateCacheSuccess(String volumeName,
-      String bucketName, String keyName, Map<String, String> metadata) throws 
Exception {
+      String bucketName, String keyName, Map<String, String> metadata, 
Map<String, String> tags) throws Exception {
 
     OMRequest initiateMPURequest = doPreExecuteInitiateMPU(volumeName,
-        bucketName, keyName, metadata);
+        bucketName, keyName, metadata, tags);
 
     S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
         getS3InitiateMultipartUploadReq(initiateMPURequest);
@@ -188,6 +197,9 @@ public class TestS3MultipartUploadCompleteRequest
     if (metadata != null) {
       
assertThat(multipartKeyInfo.getMetadata()).containsAllEntriesOf(metadata);
     }
+    if (tags != null) {
+      assertThat(multipartKeyInfo.getTags()).containsAllEntriesOf(tags);
+    }
 
     OmBucketInfo omBucketInfo = omMetadataManager.getBucketTable()
         .getCacheValue(new CacheKey<>(
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
index 136e47c776..4ffc301193 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
@@ -62,14 +62,22 @@ import org.apache.hadoop.ozone.s3.metrics.S3GatewayMetrics;
 import org.apache.hadoop.ozone.s3.signature.SignatureInfo;
 import org.apache.hadoop.ozone.s3.util.AuditUtils;
 import org.apache.hadoop.util.Time;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.ozone.OzoneConsts.ETAG;
 import static org.apache.hadoop.ozone.OzoneConsts.KB;
+import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_TAG;
 import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError;
 import static 
org.apache.hadoop.ozone.s3.util.S3Consts.CUSTOM_METADATA_HEADER_PREFIX;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_HEADER;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_KEY_LENGTH_LIMIT;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_NUM_LIMIT;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_REGEX_PATTERN;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_VALUE_LENGTH_LIMIT;
 
 /**
  * Basic helpers for all the REST endpoints.
@@ -345,6 +353,82 @@ public abstract class EndpointBase implements Auditor {
     }
   }
 
+  protected Map<String, String> getTaggingFromHeaders(HttpHeaders httpHeaders)
+      throws OS3Exception {
+    String tagString = httpHeaders.getHeaderString(TAG_HEADER);
+
+    if (StringUtils.isEmpty(tagString)) {
+      return Collections.emptyMap();
+    }
+
+    List<NameValuePair> tagPairs = URLEncodedUtils.parse(tagString, UTF_8);
+
+    if (tagPairs.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    Map<String, String> tags = new HashMap<>();
+    // Tag restrictions: 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_S3Tag.html
+    for (NameValuePair tagPair: tagPairs) {
+      if (StringUtils.isEmpty(tagPair.getName())) {
+        OS3Exception ex = newError(INVALID_TAG, TAG_HEADER);
+        ex.setErrorMessage("Some tag keys are empty, please specify the 
non-empty tag keys");
+        throw ex;
+      }
+
+      if (tagPair.getValue() == null) {
+        // For example for query parameter with only value (e.g. "tag1")
+        OS3Exception ex = newError(INVALID_TAG, tagPair.getName());
+        ex.setErrorMessage("Some tag values are not specified, please specify 
the tag values");
+        throw ex;
+      }
+
+      if (tags.containsKey(tagPair.getName())) {
+        // Tags that are associated with an object must have unique tag keys
+        // Reject request if the same key is used twice on the same resource
+        OS3Exception ex = newError(INVALID_TAG, tagPair.getName());
+        ex.setErrorMessage("There are tags with duplicate tag keys, tag keys 
should be unique");
+        throw ex;
+      }
+
+      if (tagPair.getName().length() > TAG_KEY_LENGTH_LIMIT) {
+        OS3Exception ex = newError(INVALID_TAG, tagPair.getName());
+        ex.setErrorMessage("The tag key exceeds the maximum length of " + 
TAG_KEY_LENGTH_LIMIT);
+        throw ex;
+      }
+
+      if (tagPair.getValue().length() > TAG_VALUE_LENGTH_LIMIT) {
+        OS3Exception ex = newError(INVALID_TAG, tagPair.getValue());
+        ex.setErrorMessage("The tag value exceeds the maximum length of " + 
TAG_VALUE_LENGTH_LIMIT);
+        throw ex;
+      }
+
+      if (!TAG_REGEX_PATTERN.matcher(tagPair.getName()).matches()) {
+        OS3Exception ex = newError(INVALID_TAG, tagPair.getName());
+        ex.setErrorMessage("The tag key does not have a valid pattern");
+        throw ex;
+      }
+
+      if (!TAG_REGEX_PATTERN.matcher(tagPair.getValue()).matches()) {
+        OS3Exception ex = newError(INVALID_TAG, tagPair.getValue());
+        ex.setErrorMessage("The tag value does not have a valid pattern");
+        throw ex;
+      }
+
+      tags.put(tagPair.getName(), tagPair.getValue());
+    }
+
+    if (tags.size() > TAG_NUM_LIMIT) {
+      // You can associate up to 10 tags with an object.
+      OS3Exception ex = S3ErrorTable.newError(INVALID_TAG, TAG_HEADER);
+      ex.setErrorMessage("The number of tags " + tags.size() +
+          " exceeded the maximum number of tags of " + TAG_NUM_LIMIT);
+      throw ex;
+    }
+
+    return tags;
+  }
+
   private AuditMessage.Builder auditMessageBaseBuilder(AuditAction op,
       Map<String, String> auditMap) {
     AuditMessage.Builder builder = new AuditMessage.Builder()
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 301e47bffa..b3332efbe2 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -135,6 +135,8 @@ import static 
org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER;
 import static 
org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER_SUPPORTED_UNIT;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.CopyDirective;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_COUNT_HEADER;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_DIRECTIVE_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Utils.urlDecode;
 
 /**
@@ -299,6 +301,8 @@ public class ObjectEndpoint extends EndpointBase {
         digestInputStream = new DigestInputStream(body, 
getMessageDigestInstance());
       }
 
+      Map<String, String> tags = getTaggingFromHeaders(headers);
+
       long putLength;
       String eTag = null;
       if (datastreamEnabled && !enableEC && length > datastreamMinLength) {
@@ -311,7 +315,7 @@ public class ObjectEndpoint extends EndpointBase {
       } else {
         try (OzoneOutputStream output = getClientProtocol().createKey(
             volume.getName(), bucketName, keyPath, length, replicationConfig,
-            customMetadata)) {
+            customMetadata, tags)) {
           long metadataLatencyNs =
               getMetrics().updatePutKeyMetadataStats(startNanos);
           perf.appendMetaLatencyNanos(metadataLatencyNs);
@@ -513,6 +517,7 @@ public class ObjectEndpoint extends EndpointBase {
         }
       }
       addLastModifiedDate(responseBuilder, keyDetails);
+      addTagCountIfAny(responseBuilder, keyDetails);
       long metadataLatencyNs =
           getMetrics().updateGetKeyMetadataStats(startNanos);
       perf.appendMetaLatencyNanos(metadataLatencyNs);
@@ -554,6 +559,17 @@ public class ObjectEndpoint extends EndpointBase {
             RFC1123Util.FORMAT.format(lastModificationTime));
   }
 
+  static void addTagCountIfAny(
+      ResponseBuilder responseBuilder, OzoneKey key) {
+    // See x-amz-tagging-count in 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
+    // The number of tags, IF ANY, on the object, when you have the relevant
+    // permission to read object tags
+    if (!key.getTags().isEmpty()) {
+      responseBuilder
+          .header(TAG_COUNT_HEADER, key.getTags().size());
+    }
+  }
+
   /**
    * Rest endpoint to check existence of an object in a bucket.
    * <p>
@@ -749,11 +765,13 @@ public class ObjectEndpoint extends EndpointBase {
       Map<String, String> customMetadata =
           getCustomMetadataFromHeaders(headers.getRequestHeaders());
 
+      Map<String, String> tags = getTaggingFromHeaders(headers);
+
       ReplicationConfig replicationConfig =
           getReplicationConfig(ozoneBucket, storageType);
 
       OmMultipartInfo multipartInfo =
-          ozoneBucket.initiateMultipartUpload(key, replicationConfig, 
customMetadata);
+          ozoneBucket.initiateMultipartUpload(key, replicationConfig, 
customMetadata, tags);
 
       MultipartUploadInitiateResponse multipartUploadInitiateResponse = new
           MultipartUploadInitiateResponse();
@@ -1131,7 +1149,8 @@ public class ObjectEndpoint extends EndpointBase {
       String destKey, String destBucket,
       ReplicationConfig replication,
       Map<String, String> metadata,
-      PerformanceStringBuilder perf, long startNanos)
+      PerformanceStringBuilder perf, long startNanos,
+      Map<String, String> tags)
       throws IOException {
     long copyLength;
     if (datastreamEnabled && !(replication != null &&
@@ -1140,11 +1159,11 @@ public class ObjectEndpoint extends EndpointBase {
       perf.appendStreamMode();
       copyLength = ObjectEndpointStreaming
           .copyKeyWithStream(volume.getBucket(destBucket), destKey, srcKeyLen,
-              chunkSize, replication, metadata, src, perf, startNanos);
+              chunkSize, replication, metadata, src, perf, startNanos, tags);
     } else {
       try (OzoneOutputStream dest = getClientProtocol()
           .createKey(volume.getName(), destBucket, destKey, srcKeyLen,
-              replication, metadata)) {
+              replication, metadata, tags)) {
         long metadataLatencyNs =
             getMetrics().updateCopyKeyMetadataStats(startNanos);
         perf.appendMetaLatencyNanos(metadataLatencyNs);
@@ -1199,6 +1218,23 @@ public class ObjectEndpoint extends EndpointBase {
       }
       long sourceKeyLen = sourceKeyDetails.getDataSize();
 
+      // Object tagging in copyObject with tagging directive
+      Map<String, String> tags;
+      String tagCopyDirective = headers.getHeaderString(TAG_DIRECTIVE_HEADER);
+      if (StringUtils.isEmpty(tagCopyDirective) || 
tagCopyDirective.equals(CopyDirective.COPY.name())) {
+        // Tag-set will be copied from the source directly
+        tags = sourceKeyDetails.getTags();
+      } else if (tagCopyDirective.equals(CopyDirective.REPLACE.name())) {
+        // Replace the tags with the tags from the request headers
+        tags = getTaggingFromHeaders(headers);
+      } else {
+        OS3Exception ex = newError(INVALID_ARGUMENT, tagCopyDirective);
+        ex.setErrorMessage("An error occurred (InvalidArgument) " +
+            "when calling the CopyObject operation: " +
+            "The tagging copy directive specified is invalid. Valid values are 
COPY or REPLACE.");
+        throw ex;
+      }
+
       // Custom metadata in copyObject with metadata directive
       Map<String, String> customMetadata;
       String metadataCopyDirective = 
headers.getHeaderString(CUSTOM_METADATA_COPY_DIRECTIVE_HEADER);
@@ -1212,7 +1248,7 @@ public class ObjectEndpoint extends EndpointBase {
         OS3Exception ex = newError(INVALID_ARGUMENT, metadataCopyDirective);
         ex.setErrorMessage("An error occurred (InvalidArgument) " +
             "when calling the CopyObject operation: " +
-            "The metadata directive specified is invalid. Valid values are 
COPY or REPLACE.");
+            "The metadata copy directive specified is invalid. Valid values 
are COPY or REPLACE.");
         throw ex;
       }
 
@@ -1221,7 +1257,7 @@ public class ObjectEndpoint extends EndpointBase {
         getMetrics().updateCopyKeyMetadataStats(startNanos);
         sourceDigestInputStream = new DigestInputStream(src, 
getMessageDigestInstance());
         copy(volume, sourceDigestInputStream, sourceKeyLen, destkey, 
destBucket, replicationConfig,
-                customMetadata, perf, startNanos);
+                customMetadata, perf, startNanos, tags);
       }
 
       final OzoneKeyDetails destKeyDetails = getClientProtocol().getKeyDetails(
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
index b916fc111d..cb9499aa20 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
@@ -122,11 +122,12 @@ final class ObjectEndpointStreaming {
       int bufferSize,
       ReplicationConfig replicationConfig,
       Map<String, String> keyMetadata,
-      DigestInputStream body, PerformanceStringBuilder perf, long startNanos)
+      DigestInputStream body, PerformanceStringBuilder perf, long startNanos,
+      Map<String, String> tags)
       throws IOException {
     long writeLen;
     try (OzoneDataStreamOutput streamOutput = bucket.createStreamKey(keyPath,
-        length, replicationConfig, keyMetadata)) {
+        length, replicationConfig, keyMetadata, tags)) {
       long metadataLatencyNs =
           METRICS.updateCopyKeyMetadataStats(startNanos);
       writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length);
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java
index 763c2d6be5..42c044086b 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java
@@ -139,6 +139,12 @@ public final class S3ErrorTable {
       "BucketAlreadyExists", "The requested bucket name is not available" +
       " as it already exists.", HTTP_CONFLICT);
 
+  public static final OS3Exception INVALID_TAG = new OS3Exception(
+      "InvalidTag", "Your request contains tag input that is not valid.", 
HTTP_BAD_REQUEST);
+
+  public static final OS3Exception NO_SUCH_TAG_SET = new OS3Exception(
+      "NoSuchTagSet", "The specified tag does not exist.", HTTP_NOT_FOUND);
+
   public static OS3Exception newError(OS3Exception e, String resource) {
     return newError(e, resource, null);
   }
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3Consts.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3Consts.java
index 3b38ff03c4..3a29bac226 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3Consts.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3Consts.java
@@ -70,6 +70,18 @@ public final class S3Consts {
   public static final String DECODED_CONTENT_LENGTH_HEADER =
       "x-amz-decoded-content-length";
 
+  // Constants related to S3 tags
+  public static final String TAG_HEADER = "x-amz-tagging";
+  public static final String TAG_DIRECTIVE_HEADER = "x-amz-tagging-directive";
+  public static final String TAG_COUNT_HEADER = "x-amz-tagging-count";
+
+  public static final int TAG_NUM_LIMIT = 10;
+  public static final int TAG_KEY_LENGTH_LIMIT = 128;
+  public static final int TAG_VALUE_LENGTH_LIMIT = 256;
+  // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_S3Tag.html
+  // Also see 
https://docs.aws.amazon.com/directoryservice/latest/devguide/API_Tag.html for 
Java regex equivalent
+  public static final Pattern TAG_REGEX_PATTERN = 
Pattern.compile("^([\\p{L}\\p{Z}\\p{N}_.:/=+\\-]*)$");
+
   /**
    * Copy directive for metadata and tags.
    */
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
index bc562d5d93..c675a9ba6a 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
@@ -227,6 +227,16 @@ public class ClientProtocolStub implements ClientProtocol {
         .createKey(keyName, size, replicationConfig, metadata);
   }
 
+  @Override
+  public OzoneOutputStream createKey(String volumeName, String bucketName,
+                                     String keyName, long size,
+                                     ReplicationConfig replicationConfig,
+                                     Map<String, String> metadata,
+                                     Map<String, String> tags) throws 
IOException {
+    return getBucket(volumeName, bucketName)
+        .createKey(keyName, size, replicationConfig, metadata, tags);
+  }
+
   @Override
   public OzoneInputStream getKey(String volumeName, String bucketName,
                                  String keyName) throws IOException {
@@ -322,6 +332,14 @@ public class ClientProtocolStub implements ClientProtocol {
         .initiateMultipartUpload(keyName, replicationConfig, metadata);
   }
 
+  @Override
+  public OmMultipartInfo initiateMultipartUpload(String volumeName,
+         String bucketName, String keyName, ReplicationConfig 
replicationConfig,
+         Map<String, String> metadata, Map<String, String> tags) throws 
IOException {
+    return getBucket(volumeName, bucketName)
+        .initiateMultipartUpload(keyName, replicationConfig, metadata, tags);
+  }
+
   @Override
   public OzoneOutputStream createMultipartKey(String volumeName,
                                               String bucketName, String 
keyName,
@@ -637,6 +655,14 @@ public class ClientProtocolStub implements ClientProtocol {
     return null;
   }
 
+  @Override
+  public OzoneDataStreamOutput createStreamKey(
+      String volumeName, String bucketName, String keyName, long size,
+      ReplicationConfig replicationConfig, Map<String, String> metadata,
+      Map<String, String> tags) throws IOException {
+    return null;
+  }
+
   @Override
   public OzoneDataStreamOutput createMultipartStreamKey(
       String volumeName, String bucketName, String keyName, long size,
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index db74bd562c..22b002945e 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -123,7 +123,8 @@ public final class OzoneBucketStub extends OzoneBucket {
 
   @Override
   public OzoneOutputStream createKey(String key, long size,
-      ReplicationConfig rConfig, Map<String, String> metadata)
+      ReplicationConfig rConfig, Map<String, String> metadata,
+      Map<String, String> tags)
       throws IOException {
     assertDoesNotExist(key + "/");
 
@@ -148,7 +149,8 @@ public final class OzoneBucketStub extends OzoneBucket {
                 System.currentTimeMillis(),
                 new ArrayList<>(), finalReplicationCon, metadata, null,
                 () -> readKey(key), true,
-                UserGroupInformation.getCurrentUser().getShortUserName()
+                UserGroupInformation.getCurrentUser().getShortUserName(),
+                tags
             ));
             super.close();
           }
@@ -160,7 +162,8 @@ public final class OzoneBucketStub extends OzoneBucket {
   @Override
   public OzoneDataStreamOutput createStreamKey(String key, long size,
                                                ReplicationConfig rConfig,
-                                               Map<String, String> keyMetadata)
+                                               Map<String, String> keyMetadata,
+                                               Map<String, String> tags)
       throws IOException {
     assertDoesNotExist(key + "/");
 
@@ -188,7 +191,8 @@ public final class OzoneBucketStub extends OzoneBucket {
                 System.currentTimeMillis(),
                 new ArrayList<>(), rConfig, objectMetadata, null,
                 null, false,
-                UserGroupInformation.getCurrentUser().getShortUserName()
+                UserGroupInformation.getCurrentUser().getShortUserName(),
+                tags
             ));
           }
 
@@ -281,7 +285,8 @@ public final class OzoneBucketStub extends OzoneBucket {
           ozoneKeyDetails.getReplicationConfig(),
           ozoneKeyDetails.getMetadata(),
           ozoneKeyDetails.isFile(),
-          ozoneKeyDetails.getOwner());
+          ozoneKeyDetails.getOwner(),
+          ozoneKeyDetails.getTags());
     } else {
       throw new OMException(ResultCodes.KEY_NOT_FOUND);
     }
@@ -376,10 +381,10 @@ public final class OzoneBucketStub extends OzoneBucket {
 
   @Override
   public OmMultipartInfo initiateMultipartUpload(String keyName,
-       ReplicationConfig config, Map<String, String> metadata)
+       ReplicationConfig config, Map<String, String> metadata, Map<String, 
String> tags)
       throws IOException {
     String uploadID = UUID.randomUUID().toString();
-    keyToMultipartUpload.put(keyName, new MultipartInfoStub(uploadID, 
metadata));
+    keyToMultipartUpload.put(keyName, new MultipartInfoStub(uploadID, 
metadata, tags));
     return new OmMultipartInfo(getVolumeName(), getName(), keyName, uploadID);
   }
 
@@ -450,7 +455,8 @@ public final class OzoneBucketStub extends OzoneBucket {
           new ArrayList<>(), getReplicationConfig(),
           keyToMultipartUpload.get(key).getMetadata(), null,
           () -> readKey(key), true,
-          UserGroupInformation.getCurrentUser().getShortUserName()
+          UserGroupInformation.getCurrentUser().getShortUserName(),
+          keyToMultipartUpload.get(key).getTags()
       ));
     }
 
@@ -599,7 +605,8 @@ public final class OzoneBucketStub extends OzoneBucket {
         System.currentTimeMillis(),
         new ArrayList<>(), replicationConfig, new HashMap<>(), null,
         () -> readKey(keyName), false,
-        UserGroupInformation.getCurrentUser().getShortUserName()));
+        UserGroupInformation.getCurrentUser().getShortUserName(),
+        Collections.emptyMap()));
   }
 
   private void assertDoesNotExist(String keyName) throws OMException {
@@ -673,10 +680,13 @@ public final class OzoneBucketStub extends OzoneBucket {
 
     private final String uploadId;
     private final Map<String, String> metadata;
+    private final Map<String, String> tags;
 
-    MultipartInfoStub(String uploadId, Map<String, String> metadata) {
+    MultipartInfoStub(String uploadId, Map<String, String> metadata,
+                      Map<String, String> tags) {
       this.uploadId = uploadId;
       this.metadata = metadata;
+      this.tags = tags;
     }
 
     public String getUploadId() {
@@ -686,6 +696,10 @@ public final class OzoneBucketStub extends OzoneBucket {
     public Map<String, String> getMetadata() {
       return metadata;
     }
+
+    public Map<String, String> getTags() {
+      return tags;
+    }
   }
 
 }
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java
index 91e29cadc8..8cf8da95cf 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java
@@ -25,6 +25,7 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MultivaluedHashMap;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriInfo;
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.time.format.DateTimeFormatter;
 
@@ -33,7 +34,6 @@ import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientStub;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
-import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.s3.exception.OS3Exception;
 
 import org.apache.commons.io.IOUtils;
@@ -44,6 +44,9 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED;
 import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_KEY;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_COUNT_HEADER;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_HEADER;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.Mockito.doReturn;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -55,7 +58,10 @@ import static org.mockito.Mockito.when;
  */
 public class TestObjectGet {
 
-  public static final String CONTENT = "0123456789";
+  private static final String CONTENT = "0123456789";
+  private static final String BUCKET_NAME = "b1";
+  private static final String KEY_NAME = "key1";
+  private static final String KEY_WITH_TAG = "keyWithTag";
   public static final String CONTENT_TYPE1 = "video/mp4";
   public static final String CONTENT_TYPE2 = "text/html; charset=UTF-8";
   public static final String CONTENT_LANGUAGE1 = "en-CA";
@@ -76,15 +82,10 @@ public class TestObjectGet {
   private ContainerRequestContext context;
 
   @BeforeEach
-  public void init() throws IOException {
+  public void init() throws OS3Exception, IOException {
     //GIVEN
     client = new OzoneClientStub();
-    client.getObjectStore().createS3Bucket("b1");
-    OzoneBucket bucket = client.getObjectStore().getS3Bucket("b1");
-    OzoneOutputStream keyStream =
-        bucket.createKey("key1", CONTENT.getBytes(UTF_8).length);
-    keyStream.write(CONTENT.getBytes(UTF_8));
-    keyStream.close();
+    client.getObjectStore().createS3Bucket(BUCKET_NAME);
 
     rest = new ObjectEndpoint();
     rest.setClient(client);
@@ -92,6 +93,14 @@ public class TestObjectGet {
     headers = mock(HttpHeaders.class);
     rest.setHeaders(headers);
 
+    ByteArrayInputStream body = new 
ByteArrayInputStream(CONTENT.getBytes(UTF_8));
+    rest.put(BUCKET_NAME, KEY_NAME, CONTENT.length(),
+        1, null, body);
+    // Create a key with object tags
+    
when(headers.getHeaderString(TAG_HEADER)).thenReturn("tag1=value1&tag2=value2");
+    rest.put(BUCKET_NAME, KEY_WITH_TAG, CONTENT.length(),
+        1, null, body);
+
     context = mock(ContainerRequestContext.class);
     when(context.getUriInfo()).thenReturn(mock(UriInfo.class));
     when(context.getUriInfo().getQueryParameters())
@@ -102,12 +111,12 @@ public class TestObjectGet {
   @Test
   public void get() throws IOException, OS3Exception {
     //WHEN
-    Response response = rest.get("b1", "key1", 0, null, 0, null);
+    Response response = rest.get(BUCKET_NAME, KEY_NAME, 0, null, 0, null);
 
     //THEN
     OzoneInputStream ozoneInputStream =
-        client.getObjectStore().getS3Bucket("b1")
-            .readKey("key1");
+        client.getObjectStore().getS3Bucket(BUCKET_NAME)
+            .readKey(KEY_NAME);
     String keyContent =
         IOUtils.toString(ozoneInputStream, UTF_8);
 
@@ -118,13 +127,35 @@ public class TestObjectGet {
     DateTimeFormatter.RFC_1123_DATE_TIME
         .parse(response.getHeaderString("Last-Modified"));
 
+    assertNull(response.getHeaderString(TAG_COUNT_HEADER));
+  }
+
+  @Test
+  public void getKeyWithTag() throws IOException, OS3Exception {
+    //WHEN
+    Response response = rest.get(BUCKET_NAME, KEY_WITH_TAG, 0, null, 0, null);
+
+    //THEN
+    OzoneInputStream ozoneInputStream =
+        client.getObjectStore().getS3Bucket(BUCKET_NAME)
+            .readKey(KEY_NAME);
+    String keyContent =
+        IOUtils.toString(ozoneInputStream, UTF_8);
+
+    assertEquals(CONTENT, keyContent);
+    assertEquals("" + keyContent.length(),
+        response.getHeaderString("Content-Length"));
+
+    DateTimeFormatter.RFC_1123_DATE_TIME
+        .parse(response.getHeaderString("Last-Modified"));
+    assertEquals("2", response.getHeaderString(TAG_COUNT_HEADER));
   }
 
   @Test
   public void inheritRequestHeader() throws IOException, OS3Exception {
     setDefaultHeader();
 
-    Response response = rest.get("b1", "key1", 0, null, 0, null);
+    Response response = rest.get(BUCKET_NAME, KEY_NAME, 0, null, 0, null);
 
     assertEquals(CONTENT_TYPE1,
         response.getHeaderString("Content-Type"));
@@ -157,7 +188,7 @@ public class TestObjectGet {
 
     when(context.getUriInfo().getQueryParameters())
         .thenReturn(queryParameter);
-    Response response = rest.get("b1", "key1", 0, null, 0, null);
+    Response response = rest.get(BUCKET_NAME, KEY_NAME, 0, null, 0, null);
 
     assertEquals(CONTENT_TYPE2,
         response.getHeaderString("Content-Type"));
@@ -178,24 +209,26 @@ public class TestObjectGet {
     Response response;
     when(headers.getHeaderString(RANGE_HEADER)).thenReturn("bytes=0-0");
 
-    response = rest.get("b1", "key1", 0, null, 0, null);
+    response = rest.get(BUCKET_NAME, KEY_NAME, 0, null, 0, null);
     assertEquals("1", response.getHeaderString("Content-Length"));
     assertEquals(String.format("bytes 0-0/%s", CONTENT.length()),
         response.getHeaderString("Content-Range"));
 
     when(headers.getHeaderString(RANGE_HEADER)).thenReturn("bytes=0-");
-    response = rest.get("b1", "key1", 0, null, 0, null);
+    response = rest.get(BUCKET_NAME, KEY_NAME, 0, null, 0, null);
     assertEquals(String.valueOf(CONTENT.length()),
         response.getHeaderString("Content-Length"));
     assertEquals(
         String.format("bytes 0-%s/%s", CONTENT.length() - 1, CONTENT.length()),
         response.getHeaderString("Content-Range"));
+
+    assertNull(response.getHeaderString(TAG_COUNT_HEADER));
   }
 
   @Test
   public void getStatusCode() throws IOException, OS3Exception {
     Response response;
-    response = rest.get("b1", "key1", 0, null, 0, null);
+    response = rest.get(BUCKET_NAME, KEY_NAME, 0, null, 0, null);
     assertEquals(response.getStatus(),
         Response.Status.OK.getStatusCode());
 
@@ -203,9 +236,10 @@ public class TestObjectGet {
     // The 206 (Partial Content) status code indicates that the server is
     //   successfully fulfilling a range request for the target resource
     when(headers.getHeaderString(RANGE_HEADER)).thenReturn("bytes=0-1");
-    response = rest.get("b1", "key1", 0, null, 0, null);
+    response = rest.get(BUCKET_NAME, KEY_NAME, 0, null, 0, null);
     assertEquals(response.getStatus(),
         Response.Status.PARTIAL_CONTENT.getStatusCode());
+    assertNull(response.getHeaderString(TAG_COUNT_HEADER));
   }
 
   private void setDefaultHeader() {
@@ -227,17 +261,16 @@ public class TestObjectGet {
   public void testGetWhenKeyIsDirectoryAndDoesNotEndWithASlash()
       throws IOException {
     // GIVEN
-    final String bucketName = "b1";
     final String keyPath = "keyDir";
     OzoneConfiguration config = new OzoneConfiguration();
     config.set(OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED, "true");
     rest.setOzoneConfiguration(config);
-    OzoneBucket bucket = client.getObjectStore().getS3Bucket(bucketName);
+    OzoneBucket bucket = client.getObjectStore().getS3Bucket(BUCKET_NAME);
     bucket.createDirectory(keyPath);
 
     // WHEN
     final OS3Exception ex = assertThrows(OS3Exception.class,
-            () -> rest.get(bucketName, keyPath, 0, null, 0, null));
+            () -> rest.get(BUCKET_NAME, keyPath, 0, null, 0, null));
 
     // THEN
     assertEquals(NO_SUCH_KEY.getCode(), ex.getCode());
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
index abae489b41..8cde144a37 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
@@ -23,6 +23,7 @@ package org.apache.hadoop.ozone.s3.endpoint;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Map;
 import java.util.stream.Stream;
 import java.io.OutputStream;
 import java.security.MessageDigest;
@@ -32,6 +33,7 @@ import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
@@ -56,13 +58,21 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_ARGUMENT;
+import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_TAG;
 import static 
org.apache.hadoop.ozone.s3.util.S3Consts.CUSTOM_METADATA_COPY_DIRECTIVE_HEADER;
 import static 
org.apache.hadoop.ozone.s3.util.S3Consts.CUSTOM_METADATA_HEADER_PREFIX;
 import static 
org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_DIRECTIVE_HEADER;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_HEADER;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_KEY_LENGTH_LIMIT;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_NUM_LIMIT;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_VALUE_LENGTH_LIMIT;
 import static org.apache.hadoop.ozone.s3.util.S3Utils.urlEncode;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -160,6 +170,7 @@ class TestObjectPut {
     assertEquals(replication, keyDetails.getReplicationConfig());
     assertNotNull(keyDetails.getMetadata());
     assertThat(keyDetails.getMetadata().get(OzoneConsts.ETAG)).isNotEmpty();
+    assertThat(keyDetails.getTags()).isEmpty();
   }
 
   @Test
@@ -193,6 +204,129 @@ class TestObjectPut {
     assertEquals(15, getKeyDataSize());
   }
 
+  @Test
+  public void testPutObjectWithTags() throws IOException, OS3Exception {
+    HttpHeaders headersWithTags = Mockito.mock(HttpHeaders.class);
+    
when(headersWithTags.getHeaderString(TAG_HEADER)).thenReturn("tag1=value1&tag2=value2");
+
+    ByteArrayInputStream body =
+        new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
+    objectEndpoint.setHeaders(headersWithTags);
+
+    Response response = objectEndpoint.put(BUCKET_NAME, KEY_NAME, 
CONTENT.length(),
+        1, null, body);
+
+    assertEquals(200, response.getStatus());
+
+    OzoneKeyDetails keyDetails =
+        clientStub.getObjectStore().getS3Bucket(BUCKET_NAME).getKey(KEY_NAME);
+    Map<String, String> tags = keyDetails.getTags();
+    assertEquals(2, tags.size());
+    assertEquals("value1", tags.get("tag1"));
+    assertEquals("value2", tags.get("tag2"));
+  }
+
+  @Test
+  public void testPutObjectWithOnlyTagKey() throws Exception {
+    ByteArrayInputStream body =
+        new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
+    HttpHeaders headerWithOnlyTagKey = Mockito.mock(HttpHeaders.class);
+    // Try to send with only the key (no value)
+    when(headerWithOnlyTagKey.getHeaderString(TAG_HEADER)).thenReturn("tag1");
+    objectEndpoint.setHeaders(headerWithOnlyTagKey);
+
+    try {
+      objectEndpoint.put(BUCKET_NAME, KEY_NAME, CONTENT.length(),
+          1, null, body);
+      fail("request with invalid query param should fail");
+    } catch (OS3Exception ex) {
+      assertEquals(INVALID_TAG.getCode(), ex.getCode());
+      assertThat(ex.getErrorMessage()).contains("Some tag values are not 
specified");
+      assertEquals(INVALID_TAG.getHttpCode(), ex.getHttpCode());
+    }
+  }
+
+  @Test
+  public void testPutObjectWithDuplicateTagKey() throws Exception {
+    ByteArrayInputStream body =
+        new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
+    HttpHeaders headersWithDuplicateTagKey = Mockito.mock(HttpHeaders.class);
+    
when(headersWithDuplicateTagKey.getHeaderString(TAG_HEADER)).thenReturn("tag1=value1&tag1=value2");
+    objectEndpoint.setHeaders(headersWithDuplicateTagKey);
+    try {
+      objectEndpoint.put(BUCKET_NAME, KEY_NAME, CONTENT.length(),
+          1, null, body);
+      fail("request with duplicate tag key should fail");
+    } catch (OS3Exception ex) {
+      assertEquals(INVALID_TAG.getCode(), ex.getCode());
+      assertThat(ex.getErrorMessage()).contains("There are tags with duplicate 
tag keys");
+      assertEquals(INVALID_TAG.getHttpCode(), ex.getHttpCode());
+    }
+  }
+
+  @Test
+  public void testPutObjectWithLongTagKey() throws Exception {
+    ByteArrayInputStream body =
+        new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
+    HttpHeaders headersWithLongTagKey = Mockito.mock(HttpHeaders.class);
+    String longTagKey = StringUtils.repeat('k', TAG_KEY_LENGTH_LIMIT + 1);
+    
when(headersWithLongTagKey.getHeaderString(TAG_HEADER)).thenReturn(longTagKey + 
"=value1");
+    objectEndpoint.setHeaders(headersWithLongTagKey);
+    try {
+      objectEndpoint.put(BUCKET_NAME, KEY_NAME, CONTENT.length(),
+          1, null, body);
+      fail("request with tag key exceeding the length limit should fail");
+    } catch (OS3Exception ex) {
+      assertEquals(INVALID_TAG.getCode(), ex.getCode());
+      assertThat(ex.getErrorMessage()).contains("The tag key exceeds the 
maximum length");
+      assertEquals(INVALID_TAG.getHttpCode(), ex.getHttpCode());
+    }
+  }
+
+  @Test
+  public void testPutObjectWithLongTagValue() throws Exception {
+    ByteArrayInputStream body =
+        new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
+    HttpHeaders headersWithLongTagValue = Mockito.mock(HttpHeaders.class);
+    objectEndpoint.setHeaders(headersWithLongTagValue);
+    String longTagValue = StringUtils.repeat('v', TAG_VALUE_LENGTH_LIMIT + 1);
+    
when(headersWithLongTagValue.getHeaderString(TAG_HEADER)).thenReturn("tag1=" + 
longTagValue);
+    try {
+      objectEndpoint.put(BUCKET_NAME, KEY_NAME, CONTENT.length(),
+          1, null, body);
+      fail("request with tag value exceeding the length limit should fail");
+    } catch (OS3Exception ex) {
+      assertEquals(INVALID_TAG.getCode(), ex.getCode());
+      assertThat(ex.getErrorMessage()).contains("The tag value exceeds the 
maximum length");
+      assertEquals(INVALID_TAG.getHttpCode(), ex.getHttpCode());
+    }
+  }
+
+  @Test
+  public void testPutObjectWithTooManyTags() throws Exception {
+    ByteArrayInputStream body =
+        new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
+    HttpHeaders headersWithTooManyTags = Mockito.mock(HttpHeaders.class);
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < TAG_NUM_LIMIT + 1; i++) {
+      sb.append(String.format("tag%d=value%d", i, i));
+      if (i < TAG_NUM_LIMIT) {
+        sb.append("&");
+      }
+    }
+    
when(headersWithTooManyTags.getHeaderString(TAG_HEADER)).thenReturn(sb.toString());
+    objectEndpoint.setHeaders(headersWithTooManyTags);
+    try {
+      objectEndpoint.put(BUCKET_NAME, KEY_NAME, CONTENT.length(),
+          1, null, body);
+      fail("request with number of tags exceeding limit should fail");
+    } catch (OS3Exception ex) {
+      assertEquals(INVALID_TAG.getCode(), ex.getCode());
+      assertThat(ex.getErrorMessage()).contains("exceeded the maximum number 
of tags");
+      assertEquals(INVALID_TAG.getHttpCode(), ex.getHttpCode());
+    }
+  }
+
   private long getKeyDataSize() throws IOException {
     return clientStub.getObjectStore().getS3Bucket(BUCKET_NAME)
         .getKey(KEY_NAME).getDataSize();
@@ -354,7 +488,7 @@ class TestObjectPut {
         "test copy object failed");
     assertThat(e.getHttpCode()).isEqualTo(400);
     assertThat(e.getCode()).isEqualTo("InvalidArgument");
-    assertThat(e.getErrorMessage()).contains("The metadata directive specified 
is invalid");
+    assertThat(e.getErrorMessage()).contains("The metadata copy directive 
specified is invalid");
 
     
when(headers.getHeaderString(CUSTOM_METADATA_COPY_DIRECTIVE_HEADER)).thenReturn("COPY");
 
@@ -438,6 +572,99 @@ class TestObjectPut {
     }
   }
 
+  @Test
+  public void testCopyObjectWithTags() throws IOException, OS3Exception {
+    // Put object in to source bucket
+    HttpHeaders headersForPut = Mockito.mock(HttpHeaders.class);
+    
when(headersForPut.getHeaderString(TAG_HEADER)).thenReturn("tag1=value1&tag2=value2");
+    ByteArrayInputStream body =
+        new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
+    objectEndpoint.setHeaders(headersForPut);
+
+    String sourceKeyName = "sourceKey";
+
+    Response putResponse = objectEndpoint.put(BUCKET_NAME, sourceKeyName,
+        CONTENT.length(), 1, null, body);
+    OzoneKeyDetails keyDetails =
+        
clientStub.getObjectStore().getS3Bucket(BUCKET_NAME).getKey(sourceKeyName);
+
+    assertEquals(200, putResponse.getStatus());
+    Map<String, String> tags = keyDetails.getTags();
+    assertEquals(2, tags.size());
+    assertEquals("value1", tags.get("tag1"));
+    assertEquals("value2", tags.get("tag2"));
+
+    // Copy object without x-amz-tagging-directive (default to COPY)
+    String destKey = "key=value/2";
+    HttpHeaders headersForCopy = Mockito.mock(HttpHeaders.class);
+    when(headersForCopy.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
+        BUCKET_NAME  + "/" + urlEncode(sourceKeyName));
+
+    objectEndpoint.setHeaders(headersForCopy);
+    Response copyResponse = objectEndpoint.put(DEST_BUCKET_NAME, destKey, 
CONTENT.length(), 1, null, body);
+
+    OzoneKeyDetails destKeyDetails = clientStub.getObjectStore()
+        .getS3Bucket(DEST_BUCKET_NAME).getKey(destKey);
+
+    assertEquals(200, copyResponse.getStatus());
+    Map<String, String> destKeyTags = destKeyDetails.getTags();
+
+    // Since the default directive is COPY, it will copy the source key's tags
+    // to the destination key
+    assertEquals(2, destKeyTags.size());
+    assertEquals("value1", destKeyTags.get("tag1"));
+    assertEquals("value2", destKeyTags.get("tag2"));
+
+    // Copy object with x-amz-tagging-directive = COPY
+    
when(headersForCopy.getHeaderString(TAG_DIRECTIVE_HEADER)).thenReturn("COPY");
+
+    // With x-amz-tagging-directive = COPY with a different x-amz-tagging
+    when(headersForCopy.getHeaderString(TAG_HEADER)).thenReturn("tag3=value3");
+    copyResponse = objectEndpoint.put(DEST_BUCKET_NAME, destKey, 
CONTENT.length(), 1, null, body);
+    assertEquals(200, copyResponse.getStatus());
+
+    destKeyDetails = clientStub.getObjectStore()
+        .getS3Bucket(DEST_BUCKET_NAME).getKey(destKey);
+    destKeyTags = destKeyDetails.getTags();
+
+    // Since the x-amz-tagging-directive is COPY, we ignore the x-amz-tagging
+    // header
+    assertEquals(2, destKeyTags.size());
+    assertEquals("value1", destKeyTags.get("tag1"));
+    assertEquals("value2", destKeyTags.get("tag2"));
+
+    // Copy object with x-amz-tagging-directive = REPLACE
+    
when(headersForCopy.getHeaderString(TAG_DIRECTIVE_HEADER)).thenReturn("REPLACE");
+    copyResponse = objectEndpoint.put(DEST_BUCKET_NAME, destKey, 
CONTENT.length(), 1, null, body);
+    assertEquals(200, copyResponse.getStatus());
+
+    destKeyDetails = clientStub.getObjectStore()
+        .getS3Bucket(DEST_BUCKET_NAME).getKey(destKey);
+    destKeyTags = destKeyDetails.getTags();
+
+    // Since the x-amz-tagging-directive is REPLACE, we replace the source key
+    // tags with the one specified in the copy request
+    assertEquals(1, destKeyTags.size());
+    assertEquals("value3", destKeyTags.get("tag3"));
+    assertThat(destKeyTags).doesNotContainKeys("tag1", "tag2");
+  }
+
+  @Test
+  public void testCopyObjectWithInvalidTagCopyDirective() throws Exception {
+    ByteArrayInputStream body =
+        new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
+    // Copy object with invalid x-amz-tagging-directive
+    HttpHeaders headersForCopy = Mockito.mock(HttpHeaders.class);
+    
when(headersForCopy.getHeaderString(TAG_DIRECTIVE_HEADER)).thenReturn("INVALID");
+    try {
+      objectEndpoint.put(DEST_BUCKET_NAME, "somekey", CONTENT.length(), 1, 
null, body);
+    } catch (OS3Exception ex) {
+      assertEquals(INVALID_ARGUMENT.getCode(), ex.getCode());
+      assertThat(ex.getErrorMessage()).contains("The tagging copy directive 
specified is invalid");
+      assertEquals(INVALID_ARGUMENT.getHttpCode(), ex.getHttpCode());
+    }
+  }
+
   @Test
   void testInvalidStorageType() {
     ByteArrayInputStream body =
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
index ec262cdf21..04551ac7cc 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
@@ -249,7 +249,7 @@ public class TestPermissionCheck {
     when(objectStore.getS3Volume()).thenReturn(volume);
     when(volume.getBucket("bucketName")).thenReturn(bucket);
     doThrow(exception).when(clientProtocol).createKey(
-            anyString(), anyString(), anyString(), anyLong(), any(), any());
+            anyString(), anyString(), anyString(), anyLong(), any(), anyMap(), 
anyMap());
     ObjectEndpoint objectEndpoint = new ObjectEndpoint();
     objectEndpoint.setClient(client);
     objectEndpoint.setHeaders(headers);
@@ -279,7 +279,7 @@ public class TestPermissionCheck {
   @Test
   public void testMultiUploadKey() throws IOException {
     when(objectStore.getS3Bucket(anyString())).thenReturn(bucket);
-    doThrow(exception).when(bucket).initiateMultipartUpload(anyString(), 
any(), anyMap());
+    doThrow(exception).when(bucket).initiateMultipartUpload(anyString(), 
any(), anyMap(), anyMap());
     ObjectEndpoint objectEndpoint = new ObjectEndpoint();
     objectEndpoint.setClient(client);
     objectEndpoint.setHeaders(headers);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to