This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 09364a95f84 [improve] Upgrade to Oxia client 0.2.0 (#22663) 09364a95f84 is described below commit 09364a95f8429b12a5951d4d1ff45766b13e92cb Author: Matteo Merli <mme...@apache.org> AuthorDate: Tue May 7 07:13:45 2024 -0700 [improve] Upgrade to Oxia client 0.2.0 (#22663) --- distribution/licenses/LICENSE-Reactive-gRPC.txt | 29 ------------ distribution/server/src/assemble/LICENSE.bin.txt | 9 +--- pom.xml | 2 +- .../metadata/impl/oxia/OxiaMetadataStore.java | 53 ++++++++++++---------- 4 files changed, 33 insertions(+), 60 deletions(-) diff --git a/distribution/licenses/LICENSE-Reactive-gRPC.txt b/distribution/licenses/LICENSE-Reactive-gRPC.txt deleted file mode 100644 index bc589401e7b..00000000000 --- a/distribution/licenses/LICENSE-Reactive-gRPC.txt +++ /dev/null @@ -1,29 +0,0 @@ -BSD 3-Clause License - -Copyright (c) 2019, Salesforce.com, Inc. -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -* Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -* Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -* Neither the name of the copyright holder nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index aec4df2a93a..818f389be88 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -481,12 +481,10 @@ The Apache Software License, Version 2.0 * Prometheus - io.prometheus-simpleclient_httpserver-0.16.0.jar * Oxia - - io.streamnative.oxia-oxia-client-0.1.6.jar - - io.streamnative.oxia-oxia-client-metrics-api-0.1.6.jar + - io.streamnative.oxia-oxia-client-api-0.2.0.jar + - io.streamnative.oxia-oxia-client-0.2.0.jar * OpenHFT - net.openhft-zero-allocation-hashing-0.16.jar - * Project reactor - - io.projectreactor-reactor-core-3.5.2.jar * Java JSON WebTokens - io.jsonwebtoken-jjwt-api-0.11.1.jar - io.jsonwebtoken-jjwt-impl-0.11.1.jar @@ -552,9 +550,6 @@ BSD 3-clause "New" or "Revised" License * JSR305 -- com.google.code.findbugs-jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt * JLine -- jline-jline-2.14.6.jar -- ../licenses/LICENSE-JLine.txt * JLine3 -- org.jline-jline-3.21.0.jar -- ../licenses/LICENSE-JLine.txt - * Reactive gRPC - - com.salesforce.servicelibs-reactive-grpc-common-1.2.4.jar -- ../licenses/LICENSE-Reactive-gRPC.txt - - com.salesforce.servicelibs-reactor-grpc-stub-1.2.4.jar -- ../licenses/LICENSE-Reactive-gRPC.txt BSD 2-Clause License * HdrHistogram -- org.hdrhistogram-HdrHistogram-2.1.9.jar -- ../licenses/LICENSE-HdrHistogram.txt diff --git a/pom.xml b/pom.xml index 8f7ae2ed1fc..92e021d1eaa 100644 --- a/pom.xml +++ b/pom.xml @@ -249,7 +249,7 @@ flexible messaging model and an intuitive client API.</description> <apache-http-client.version>4.5.13</apache-http-client.version> <apache-httpcomponents.version>4.4.15</apache-httpcomponents.version> <jetcd.version>0.7.5</jetcd.version> - <oxia.version>0.1.6</oxia.version> + <oxia.version>0.2.0</oxia.version> <snakeyaml.version>2.0</snakeyaml.version> <ant.version>1.10.12</ant.version> <seancfoley.ipaddress.version>5.3.3</seancfoley.ipaddress.version> diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java index 2ab744e2053..728bc1175b9 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java @@ -18,20 +18,23 @@ */ package org.apache.pulsar.metadata.impl.oxia; -import io.streamnative.oxia.client.OxiaClientBuilder; import io.streamnative.oxia.client.api.AsyncOxiaClient; import io.streamnative.oxia.client.api.DeleteOption; -import io.streamnative.oxia.client.api.KeyAlreadyExistsException; import io.streamnative.oxia.client.api.Notification; +import io.streamnative.oxia.client.api.OxiaClientBuilder; import io.streamnative.oxia.client.api.PutOption; import io.streamnative.oxia.client.api.PutResult; -import io.streamnative.oxia.client.api.UnexpectedVersionIdException; import io.streamnative.oxia.client.api.Version; +import io.streamnative.oxia.client.api.exceptions.KeyAlreadyExistsException; +import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException; import java.time.Duration; +import java.util.Collections; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -69,7 +72,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore { this.synchronizer = Optional.ofNullable(metadataStoreConfig.getSynchronizer()); identity = UUID.randomUUID().toString(); client = - new OxiaClientBuilder(serviceAddress) + OxiaClientBuilder.create(serviceAddress) .clientIdentifier(identity) .namespace(namespace) .sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis())) @@ -153,14 +156,14 @@ public class OxiaMetadataStore extends AbstractMetadataStore { return getChildrenFromStore(path) .thenCompose( children -> { - if (children.size() > 0) { + if (!children.isEmpty()) { return CompletableFuture.failedFuture( new MetadataStoreException("Key '" + path + "' has children")); } else { - var delOption = + Set<DeleteOption> delOption = expectedVersion - .map(DeleteOption::ifVersionIdEquals) - .orElse(DeleteOption.Unconditionally); + .map(v -> Collections.singleton(DeleteOption.IfVersionIdEquals(v))) + .orElse(Collections.emptySet()); CompletableFuture<Boolean> result = client.delete(path, delOption); return result .thenCompose( @@ -205,20 +208,20 @@ public class OxiaMetadataStore extends AbstractMetadataStore { } else { actualPath = CompletableFuture.completedFuture(path); } - var versionCondition = - expectedVersion - .map( - ver -> { - if (ver == -1) { - return PutOption.IfRecordDoesNotExist; - } - return PutOption.ifVersionIdEquals(ver); - }) - .orElse(PutOption.Unconditionally); - var putOptions = - options.contains(CreateOption.Ephemeral) - ? new PutOption[] {PutOption.AsEphemeralRecord, versionCondition} - : new PutOption[] {versionCondition}; + Set<PutOption> putOptions = new HashSet<>(); + expectedVersion + .map( + ver -> { + if (ver == -1) { + return PutOption.IfRecordDoesNotExist; + } + return PutOption.IfVersionIdEquals(ver); + }) + .ifPresent(putOptions::add); + + if (options.contains(CreateOption.Ephemeral)) { + putOptions.add(PutOption.AsEphemeralRecord); + } return actualPath .thenCompose( aPath -> @@ -242,6 +245,10 @@ public class OxiaMetadataStore extends AbstractMetadataStore { } } + private static final byte[] EMPTY_VALUE = new byte[0]; + private static final Set<PutOption> IF_RECORD_DOES_NOT_EXIST = + Collections.singleton(PutOption.IfRecordDoesNotExist); + private CompletableFuture<Void> createParents(String path) { var parent = parent(path); if (parent == null || parent.isEmpty()) { @@ -254,7 +261,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore { return CompletableFuture.completedFuture(null); } else { return client - .put(parent, new byte[] {}, PutOption.IfRecordDoesNotExist) + .put(parent, EMPTY_VALUE, IF_RECORD_DOES_NOT_EXIST) .thenCompose(__ -> createParents(parent)); } })