This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 09364a95f84 [improve] Upgrade to Oxia client 0.2.0 (#22663)
09364a95f84 is described below

commit 09364a95f8429b12a5951d4d1ff45766b13e92cb
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Tue May 7 07:13:45 2024 -0700

    [improve] Upgrade to Oxia client 0.2.0 (#22663)
---
 distribution/licenses/LICENSE-Reactive-gRPC.txt    | 29 ------------
 distribution/server/src/assemble/LICENSE.bin.txt   |  9 +---
 pom.xml                                            |  2 +-
 .../metadata/impl/oxia/OxiaMetadataStore.java      | 53 ++++++++++++----------
 4 files changed, 33 insertions(+), 60 deletions(-)

diff --git a/distribution/licenses/LICENSE-Reactive-gRPC.txt 
b/distribution/licenses/LICENSE-Reactive-gRPC.txt
deleted file mode 100644
index bc589401e7b..00000000000
--- a/distribution/licenses/LICENSE-Reactive-gRPC.txt
+++ /dev/null
@@ -1,29 +0,0 @@
-BSD 3-Clause License
-
-Copyright (c) 2019, Salesforce.com, Inc.
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-
-* Redistributions of source code must retain the above copyright notice, this
-  list of conditions and the following disclaimer.
-
-* Redistributions in binary form must reproduce the above copyright notice,
-  this list of conditions and the following disclaimer in the documentation
-  and/or other materials provided with the distribution.
-
-* Neither the name of the copyright holder nor the names of its
-  contributors may be used to endorse or promote products derived from
-  this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
-FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
-CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
-OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index aec4df2a93a..818f389be88 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -481,12 +481,10 @@ The Apache Software License, Version 2.0
   * Prometheus
     - io.prometheus-simpleclient_httpserver-0.16.0.jar
   * Oxia
-    - io.streamnative.oxia-oxia-client-0.1.6.jar
-    - io.streamnative.oxia-oxia-client-metrics-api-0.1.6.jar
+    - io.streamnative.oxia-oxia-client-api-0.2.0.jar
+    - io.streamnative.oxia-oxia-client-0.2.0.jar
   * OpenHFT
     - net.openhft-zero-allocation-hashing-0.16.jar
-  * Project reactor
-    - io.projectreactor-reactor-core-3.5.2.jar
   * Java JSON WebTokens
     - io.jsonwebtoken-jjwt-api-0.11.1.jar
     - io.jsonwebtoken-jjwt-impl-0.11.1.jar
@@ -552,9 +550,6 @@ BSD 3-clause "New" or "Revised" License
  * JSR305 -- com.google.code.findbugs-jsr305-3.0.2.jar -- 
../licenses/LICENSE-JSR305.txt
  * JLine -- jline-jline-2.14.6.jar -- ../licenses/LICENSE-JLine.txt
  * JLine3 -- org.jline-jline-3.21.0.jar -- ../licenses/LICENSE-JLine.txt
- * Reactive gRPC
-    - com.salesforce.servicelibs-reactive-grpc-common-1.2.4.jar -- 
../licenses/LICENSE-Reactive-gRPC.txt
-    - com.salesforce.servicelibs-reactor-grpc-stub-1.2.4.jar -- 
../licenses/LICENSE-Reactive-gRPC.txt
 
 BSD 2-Clause License
  * HdrHistogram -- org.hdrhistogram-HdrHistogram-2.1.9.jar -- 
../licenses/LICENSE-HdrHistogram.txt
diff --git a/pom.xml b/pom.xml
index 8f7ae2ed1fc..92e021d1eaa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -249,7 +249,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <apache-http-client.version>4.5.13</apache-http-client.version>
     <apache-httpcomponents.version>4.4.15</apache-httpcomponents.version>
     <jetcd.version>0.7.5</jetcd.version>
-    <oxia.version>0.1.6</oxia.version>
+    <oxia.version>0.2.0</oxia.version>
     <snakeyaml.version>2.0</snakeyaml.version>
     <ant.version>1.10.12</ant.version>
     <seancfoley.ipaddress.version>5.3.3</seancfoley.ipaddress.version>
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 2ab744e2053..728bc1175b9 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -18,20 +18,23 @@
  */
 package org.apache.pulsar.metadata.impl.oxia;
 
-import io.streamnative.oxia.client.OxiaClientBuilder;
 import io.streamnative.oxia.client.api.AsyncOxiaClient;
 import io.streamnative.oxia.client.api.DeleteOption;
-import io.streamnative.oxia.client.api.KeyAlreadyExistsException;
 import io.streamnative.oxia.client.api.Notification;
+import io.streamnative.oxia.client.api.OxiaClientBuilder;
 import io.streamnative.oxia.client.api.PutOption;
 import io.streamnative.oxia.client.api.PutResult;
-import io.streamnative.oxia.client.api.UnexpectedVersionIdException;
 import io.streamnative.oxia.client.api.Version;
+import io.streamnative.oxia.client.api.exceptions.KeyAlreadyExistsException;
+import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException;
 import java.time.Duration;
+import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -69,7 +72,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
         this.synchronizer = 
Optional.ofNullable(metadataStoreConfig.getSynchronizer());
         identity = UUID.randomUUID().toString();
         client =
-                new OxiaClientBuilder(serviceAddress)
+                OxiaClientBuilder.create(serviceAddress)
                         .clientIdentifier(identity)
                         .namespace(namespace)
                         
.sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis()))
@@ -153,14 +156,14 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
         return getChildrenFromStore(path)
                 .thenCompose(
                         children -> {
-                            if (children.size() > 0) {
+                            if (!children.isEmpty()) {
                                 return CompletableFuture.failedFuture(
                                         new MetadataStoreException("Key '" + 
path + "' has children"));
                             } else {
-                                var delOption =
+                                Set<DeleteOption> delOption =
                                         expectedVersion
-                                                
.map(DeleteOption::ifVersionIdEquals)
-                                                
.orElse(DeleteOption.Unconditionally);
+                                                .map(v -> 
Collections.singleton(DeleteOption.IfVersionIdEquals(v)))
+                                                
.orElse(Collections.emptySet());
                                 CompletableFuture<Boolean> result = 
client.delete(path, delOption);
                                 return result
                                         .thenCompose(
@@ -205,20 +208,20 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
                     } else {
                         actualPath = CompletableFuture.completedFuture(path);
                     }
-                    var versionCondition =
-                            expectedVersion
-                                    .map(
-                                            ver -> {
-                                                if (ver == -1) {
-                                                    return 
PutOption.IfRecordDoesNotExist;
-                                                }
-                                                return 
PutOption.ifVersionIdEquals(ver);
-                                            })
-                                    .orElse(PutOption.Unconditionally);
-                    var putOptions =
-                            options.contains(CreateOption.Ephemeral)
-                                    ? new PutOption[] 
{PutOption.AsEphemeralRecord, versionCondition}
-                                    : new PutOption[] {versionCondition};
+                    Set<PutOption> putOptions = new HashSet<>();
+                    expectedVersion
+                            .map(
+                                    ver -> {
+                                        if (ver == -1) {
+                                            return 
PutOption.IfRecordDoesNotExist;
+                                        }
+                                        return 
PutOption.IfVersionIdEquals(ver);
+                                    })
+                            .ifPresent(putOptions::add);
+
+                    if (options.contains(CreateOption.Ephemeral)) {
+                        putOptions.add(PutOption.AsEphemeralRecord);
+                    }
                     return actualPath
                             .thenCompose(
                                     aPath ->
@@ -242,6 +245,10 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
         }
     }
 
+    private static final byte[] EMPTY_VALUE = new byte[0];
+    private static final Set<PutOption> IF_RECORD_DOES_NOT_EXIST =
+            Collections.singleton(PutOption.IfRecordDoesNotExist);
+
     private CompletableFuture<Void> createParents(String path) {
         var parent = parent(path);
         if (parent == null || parent.isEmpty()) {
@@ -254,7 +261,7 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
                                 return CompletableFuture.completedFuture(null);
                             } else {
                                 return client
-                                        .put(parent, new byte[] {}, 
PutOption.IfRecordDoesNotExist)
+                                        .put(parent, EMPTY_VALUE, 
IF_RECORD_DOES_NOT_EXIST)
                                         .thenCompose(__ -> 
createParents(parent));
                             }
                         })

Reply via email to