This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7593 by this push:
     new 7828ba3ea2 HDDS-11260. [hsync] Add Ozone Manager protocol version 
(#7015)
7828ba3ea2 is described below

commit 7828ba3ea23885928e783051c4e367d4f19db15e
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Fri Aug 2 09:28:25 2024 -0700

    HDDS-11260. [hsync] Add Ozone Manager protocol version (#7015)
---
 .../apache/hadoop/ozone/OzoneManagerVersion.java   |  1 +
 .../hadoop/ozone/client/io/KeyOutputStream.java    | 18 ++++++++++++++
 .../ozone/client/protocol/ClientProtocol.java      | 24 +++++++++++++++++++
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  | 28 ++++++++++++++++++++--
 .../fs/ozone/BasicOzoneClientAdapterImpl.java      |  8 ++++---
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   |  6 +++--
 .../hadoop/ozone/client/ClientProtocolStub.java    | 13 ++++++++++
 .../ozone/admin/om/ListOpenFilesSubCommand.java    | 10 ++++++++
 8 files changed, 101 insertions(+), 7 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
index c53aca2f15..eec2ceeb5e 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
@@ -43,6 +43,7 @@ public enum OzoneManagerVersion implements ComponentVersion {
   OBJECT_TAG(5, "OzoneManager version that supports object tags"),
 
   ATOMIC_REWRITE_KEY(6, "OzoneManager version that supports rewriting key as 
atomic operation"),
+  HBASE_SUPPORT(7, "OzoneManager version that supports HBase integration"),
 
   FUTURE_VERSION(-1, "Used internally in the client when the server side is "
       + " newer and an unknown server version has arrived to the client.");
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 649c2e7dcc..2e44539826 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.OzoneManagerVersion;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
@@ -107,6 +108,7 @@ public class KeyOutputStream extends OutputStream
    */
   private boolean atomicKeyCreation;
   private ContainerClientMetrics clientMetrics;
+  private OzoneManagerVersion ozoneManagerVersion;
 
   public KeyOutputStream(ReplicationConfig replicationConfig, 
BlockOutputStreamEntryPool blockOutputStreamEntryPool) {
     this.replication = replicationConfig;
@@ -157,6 +159,7 @@ public class KeyOutputStream extends OutputStream
     this.atomicKeyCreation = b.getAtomicKeyCreation();
     this.streamBufferArgs = b.getStreamBufferArgs();
     this.clientMetrics = b.getClientMetrics();
+    this.ozoneManagerVersion = b.ozoneManagerVersion;
   }
 
   /**
@@ -457,6 +460,11 @@ public class KeyOutputStream extends OutputStream
       throw new UnsupportedOperationException("The replication factor = "
           + replication.getRequiredNodes() + " <= 1");
     }
+    if (ozoneManagerVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) < 0) {
+      throw new UnsupportedOperationException("Hsync API requires OM version "
+          + OzoneManagerVersion.HBASE_SUPPORT + " or later. Current OM version 
"
+          + ozoneManagerVersion);
+    }
     checkNotClosed();
     final long hsyncPos = writeOffset;
 
@@ -599,6 +607,7 @@ public class KeyOutputStream extends OutputStream
     private boolean atomicKeyCreation = false;
     private StreamBufferArgs streamBufferArgs;
     private Supplier<ExecutorService> executorServiceSupplier;
+    private OzoneManagerVersion ozoneManagerVersion;
 
     public String getMultipartUploadID() {
       return multipartUploadID;
@@ -721,6 +730,15 @@ public class KeyOutputStream extends OutputStream
       return executorServiceSupplier;
     }
 
+    public Builder setOmVersion(OzoneManagerVersion omVersion) {
+      this.ozoneManagerVersion = omVersion;
+      return this;
+    }
+
+    public OzoneManagerVersion getOmVersion() {
+      return ozoneManagerVersion;
+    }
+
     public KeyOutputStream build() {
       return new KeyOutputStream(this);
     }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 55985a18d3..68812a7eb4 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -50,6 +50,8 @@ import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.DeleteTenantState;
 import org.apache.hadoop.ozone.om.helpers.ErrorInfo;
+import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
@@ -1313,4 +1315,26 @@ public interface ClientProtocol {
    * */
   void setTimes(OzoneObj obj, String keyName, long mtime, long atime)
       throws IOException;
+
+  /**
+   * Start the lease recovery of a file.
+   *
+   * @param volumeName - The volume name.
+   * @param bucketName - The bucket name.
+   * @param keyName - The key user want to recover.
+   * @param force - force recover the file.
+   * @return LeaseKeyInfo KeyInfo of file under recovery
+   * @throws IOException if an error occurs
+   */
+  LeaseKeyInfo recoverLease(String volumeName, String bucketName, String 
keyName, boolean force) throws IOException;
+
+  /**
+   * Recovery and commit a key. This will make the change from the client 
visible. The client
+   * is identified by the clientID.
+   *
+   * @param args the key to commit
+   * @param clientID the client identification
+   * @throws IOException
+   */
+  void recoverKey(OmKeyArgs args, long clientID) throws IOException;
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index ac0cf1d09c..1bad03c7d2 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -100,6 +100,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.DeleteTenantState;
 import org.apache.hadoop.ozone.om.helpers.ErrorInfo;
 import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
+import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDeleteKeys;
@@ -336,7 +337,7 @@ public class RpcClient implements ClientProtocol {
     return xceiverClientManager;
   }
 
-  private OzoneManagerVersion getOmVersion(ServiceInfoEx info) {
+  public static OzoneManagerVersion getOmVersion(ServiceInfoEx info) {
     OzoneManagerVersion version = OzoneManagerVersion.CURRENT;
     for (ServiceInfo si : info.getServiceInfoList()) {
       if (si.getNodeType() == HddsProtos.NodeType.OM) {
@@ -2562,7 +2563,8 @@ public class RpcClient implements ClientProtocol {
         .setAtomicKeyCreation(isS3GRequest.get())
         .setClientMetrics(clientMetrics)
         .setExecutorServiceSupplier(writeExecutor)
-        .setStreamBufferArgs(streamBufferArgs);
+        .setStreamBufferArgs(streamBufferArgs)
+        .setOmVersion(omVersion);
   }
 
   @Override
@@ -2684,6 +2686,28 @@ public class RpcClient implements ClientProtocol {
     ozoneManagerClient.setTimes(builder.build(), mtime, atime);
   }
 
+  @Override
+  public LeaseKeyInfo recoverLease(String volumeName, String bucketName,
+                                   String keyName, boolean force)
+      throws IOException {
+    if (omVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) < 0) {
+      throw new UnsupportedOperationException("Lease recovery API requires OM 
version "
+          + OzoneManagerVersion.HBASE_SUPPORT + " or later. Current OM version 
"
+          + omVersion);
+    }
+    return ozoneManagerClient.recoverLease(volumeName, bucketName, keyName, 
force);
+  }
+
+  @Override
+  public void recoverKey(OmKeyArgs args, long clientID) throws IOException {
+    if (omVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) < 0) {
+      throw new UnsupportedOperationException("Lease recovery API requires OM 
version "
+          + OzoneManagerVersion.HBASE_SUPPORT + " or later. Current OM version 
"
+          + omVersion);
+    }
+    ozoneManagerClient.recoverKey(args, clientID);
+  }
+
   private static ExecutorService createThreadPoolExecutor(
        int corePoolSize, int maximumPoolSize, String threadNameFormat) {
     return new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index e4df4c242b..658685779e 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.client.rpc.RpcClient;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -703,8 +704,8 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
     incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1);
 
     try {
-      return ozoneClient.getProxy().getOzoneManagerClient().recoverLease(
-          volume.getName(), bucket.getName(), pathStr, force);
+      ClientProtocol clientProtocol = ozoneClient.getProxy();
+      return clientProtocol.recoverLease(volume.getName(), bucket.getName(), 
pathStr, force);
     } catch (OMException ome) {
       if (ome.getResult() == NOT_A_FILE) {
         throw new FileNotFoundException("Path is not a file. " + 
ome.getMessage());
@@ -720,7 +721,8 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
   public void recoverFile(OmKeyArgs keyArgs) throws IOException {
     incrementCounter(Statistic.INVOCATION_RECOVER_FILE, 1);
 
-    ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L);
+    ClientProtocol clientProtocol = ozoneClient.getProxy();
+    clientProtocol.recoverKey(keyArgs, 0L);
   }
 
   @Override
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index ab05dd69c3..76533e7774 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -1395,7 +1395,8 @@ public class BasicRootedOzoneClientAdapterImpl
 
     try {
       OzoneBucket bucket = getBucket(ofsPath, false);
-      return ozoneClient.getProxy().getOzoneManagerClient().recoverLease(
+      ClientProtocol clientProtocol = ozoneClient.getProxy();
+      return clientProtocol.recoverLease(
           bucket.getVolumeName(), bucket.getName(), ofsPath.getKeyName(), 
force);
     } catch (OMException ome) {
       if (ome.getResult() == NOT_A_FILE) {
@@ -1414,7 +1415,8 @@ public class BasicRootedOzoneClientAdapterImpl
   public void recoverFile(OmKeyArgs keyArgs) throws IOException {
     incrementCounter(Statistic.INVOCATION_RECOVER_FILE, 1);
 
-    ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L);
+    ClientProtocol clientProtocol = ozoneClient.getProxy();
+    clientProtocol.recoverKey(keyArgs, 0L);
   }
 
   @Override
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
index bea831a065..21c3f8358f 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.om.helpers.DeleteTenantState;
 import org.apache.hadoop.ozone.om.helpers.ErrorInfo;
+import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
@@ -768,4 +770,15 @@ public class ClientProtocolStub implements ClientProtocol {
       throws IOException {
   }
 
+  @Override
+  public LeaseKeyInfo recoverLease(String volumeName, String bucketName,
+      String keyName, boolean force) throws IOException {
+    return null;
+  }
+
+  @Override
+  public void recoverKey(OmKeyArgs args, long clientID) throws IOException {
+
+  }
+
 }
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/ListOpenFilesSubCommand.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/ListOpenFilesSubCommand.java
index 723a4ec402..221f9f9a1c 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/ListOpenFilesSubCommand.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/ListOpenFilesSubCommand.java
@@ -21,9 +21,12 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.cli.HddsVersionProvider;
 import org.apache.hadoop.hdds.server.JsonUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.OzoneManagerVersion;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
 import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import picocli.CommandLine;
 
@@ -112,6 +115,13 @@ public class ListOpenFilesSubCommand implements 
Callable<Void> {
 
     OzoneManagerProtocol ozoneManagerClient =
         parent.createOmClient(omServiceId, omHost, false);
+    ServiceInfoEx serviceInfoEx = ozoneManagerClient.getServiceInfo();
+    final OzoneManagerVersion omVersion = 
RpcClient.getOmVersion(serviceInfoEx);
+    if (omVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) < 0) {
+      System.err.println("Error: This command requires OzoneManager version "
+          + OzoneManagerVersion.HBASE_SUPPORT.name() + " or later.");
+      return null;
+    }
 
     ListOpenFilesResult res =
         ozoneManagerClient.listOpenFiles(pathPrefix, limit, startItem);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to