This is an automated email from the ASF dual-hosted git repository.
snazy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new e1243482b NoSQL: Quarkus distributed cache invalidation (#3105)
e1243482b is described below
commit e1243482bc9be82f75903ca76b1b3efc1c2b7c0a
Author: Robert Stupp <[email protected]>
AuthorDate: Mon Nov 24 08:49:34 2025 +0100
NoSQL: Quarkus distributed cache invalidation (#3105)
Adds support for distributed NoSQL cache invalidation leveraging Quarkus.
---
LICENSE | 10 +
bom/build.gradle.kts | 1 +
gradle/projects.main.properties | 1 +
persistence/nosql/persistence/cdi/README.md | 24 +
.../cdi/quarkus-distcache/build.gradle.kts | 60 +++
.../nosql/quarkus/distcache/AddressResolver.java | 163 ++++++
.../quarkus/distcache/CacheInvalidationInfra.java | 34 ++
.../distcache/CacheInvalidationReceiver.java | 172 +++++++
.../quarkus/distcache/CacheInvalidationSender.java | 317 ++++++++++++
...QuarkusDistributedCacheInvalidationsConfig.java | 99 ++++
.../nosql/quarkus/distcache/ResolvConf.java | 127 +++++
.../nosql/quarkus/distcache/ServerInstanceId.java | 30 ++
.../nosql/quarkus/distcache/package-info.java | 35 ++
.../nosql/quarkus/distcache/HttpTestServer.java | 70 +++
.../quarkus/distcache/TestAddressResolver.java | 170 +++++++
.../distcache/TestCacheInvalidationReceiver.java | 186 +++++++
.../distcache/TestCacheInvalidationSender.java | 566 +++++++++++++++++++++
.../nosql/quarkus/distcache/TestResolvConf.java | 108 ++++
.../src/test/resources/logback-test.xml | 34 ++
19 files changed, 2207 insertions(+)
diff --git a/LICENSE b/LICENSE
index 8f50538f8..437a6849e 100644
--- a/LICENSE
+++ b/LICENSE
@@ -230,6 +230,16 @@ License: https://www.apache.org/licenses/LICENSE-2.0
--------------------------------------------------------------------------------
+This product includes code from Netty.
+
+*
persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ResolvConf.java
+
+Copyright: Copyright © 2025 The Netty project
+Home page: https://netty.io/
+License: https://www.apache.org/licenses/LICENSE-2.0
+
+--------------------------------------------------------------------------------
+
This product includes code from OpenAPITool openapi-generator
* server-templates/formParams.mustache
diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts
index 9a2efa111..bab80f538 100644
--- a/bom/build.gradle.kts
+++ b/bom/build.gradle.kts
@@ -64,6 +64,7 @@ dependencies {
api(project(":polaris-persistence-nosql-benchmark"))
api(project(":polaris-persistence-nosql-correctness"))
api(project(":polaris-persistence-nosql-cdi-common"))
+ api(project(":polaris-persistence-nosql-cdi-quarkus-distcache"))
api(project(":polaris-persistence-nosql-cdi-weld"))
api(project(":polaris-persistence-nosql-standalone"))
api(project(":polaris-persistence-nosql-testextension"))
diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties
index 1cabe3e9f..488394b49 100644
--- a/gradle/projects.main.properties
+++ b/gradle/projects.main.properties
@@ -79,6 +79,7 @@
polaris-persistence-nosql-impl=persistence/nosql/persistence/impl
polaris-persistence-nosql-benchmark=persistence/nosql/persistence/benchmark
polaris-persistence-nosql-correctness=persistence/nosql/persistence/correctness
polaris-persistence-nosql-cdi-common=persistence/nosql/persistence/cdi/common
+polaris-persistence-nosql-cdi-quarkus-distcache=persistence/nosql/persistence/cdi/quarkus-distcache
polaris-persistence-nosql-cdi-weld=persistence/nosql/persistence/cdi/weld
polaris-persistence-nosql-standalone=persistence/nosql/persistence/standalone
polaris-persistence-nosql-testextension=persistence/nosql/persistence/testextension
diff --git a/persistence/nosql/persistence/cdi/README.md
b/persistence/nosql/persistence/cdi/README.md
index 8f9dc5545..2f353b7ff 100644
--- a/persistence/nosql/persistence/cdi/README.md
+++ b/persistence/nosql/persistence/cdi/README.md
@@ -39,3 +39,27 @@ The biggest difference between the Quarkus and Weld variants
is the way how data
There are also backend specific builders that leverage Quarkus extensions
for the respective
database backends.
The Quarkus variant also adds OpenTelemetry instrumentation to the `Backend`
instances.
+
+# Distributed cache invalidation (multiple Polaris nodes)
+
+Most persisted objects are immutable, which eliminates the need to explicitly
invalidate objects.
+
+Some specific object types are intentionally mutable.
+Consistency during write operations is guaranteed by using CAS operations on
those objects.
+Read operations, however, fetch through the cache.
+
+Reference pointers are mutable by design.
+For writing operations, the current value of the reference pointer is always
read from the
+backend database.
+Read operations, however, fetch the recent pointer via the cache.
+
+To keep the state for read operations up to date, the writing node sends the
information about
+the mutation to all other nodes via the distributed cache invalidation
mechanism.
+Short cache expiration times are used to mitigate the risk of missing cache
invalidation messages.
+
+In k8s this works out of the box, leveraging k8s name service mechanisms being
able to resolve
+the set of IP addresses of all nodes in the cluster.
+Non-k8s deployments need to configure the DNS resolvable names or IP addresses
of all nodes in
+the configuration.
+
+Configuration options allow enabling and disabling the cache expiration
duration.
diff --git
a/persistence/nosql/persistence/cdi/quarkus-distcache/build.gradle.kts
b/persistence/nosql/persistence/cdi/quarkus-distcache/build.gradle.kts
new file mode 100644
index 000000000..bcb65de5c
--- /dev/null
+++ b/persistence/nosql/persistence/cdi/quarkus-distcache/build.gradle.kts
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+ id("org.kordamp.gradle.jandex")
+ id("polaris-server")
+}
+
+description = "Polaris NoSQL persistence, distributed cache invalidation for
Quarkus."
+
+dependencies {
+ implementation(project(":polaris-persistence-nosql-cdi-common"))
+ implementation(project(":polaris-persistence-nosql-api"))
+
+ implementation(platform(libs.jackson.bom))
+ implementation("com.fasterxml.jackson.core:jackson-annotations")
+ implementation("com.fasterxml.jackson.core:jackson-databind")
+
+ compileOnly(libs.smallrye.config.core)
+
+ compileOnly(project(":polaris-immutables"))
+ annotationProcessor(project(":polaris-immutables", configuration =
"processor"))
+
+ implementation(platform(libs.quarkus.bom))
+ implementation("io.quarkus:quarkus-core")
+ implementation("io.quarkus:quarkus-vertx-http")
+
+ implementation(libs.jakarta.ws.rs.api)
+
+ implementation(libs.guava)
+ implementation(libs.slf4j.api)
+
+ compileOnly(libs.jakarta.annotation.api)
+ compileOnly(libs.jakarta.validation.api)
+ compileOnly(libs.jakarta.inject.api)
+ compileOnly(libs.jakarta.enterprise.cdi.api)
+
+ compileOnly(project(":polaris-immutables"))
+ annotationProcessor(project(":polaris-immutables", configuration =
"processor"))
+
+ // Must stick with the Quarkus platform versions of Vert.X
+ // (signature of io.vertx.core.Vertx.createHttpClient() changed from 4.5 to
5.0)
+ testImplementation(enforcedPlatform(libs.quarkus.bom))
+}
diff --git
a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/AddressResolver.java
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/AddressResolver.java
new file mode 100644
index 000000000..1e3806eac
--- /dev/null
+++
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/AddressResolver.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.quarkus.distcache;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.net.NetworkInterface.networkInterfaces;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toUnmodifiableSet;
+
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.dns.DnsClient;
+import io.vertx.core.dns.DnsClientOptions;
+import java.net.InetAddress;
+import java.net.InterfaceAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Vert.x based address resolver.
+ *
+ * <p>Resolves names to both IPv4 and IPv6 addresses using a given
search-list. These
+ * functionalities are not supported vie{@code InetAddress}.
+ */
+record AddressResolver(DnsClient dnsClient, List<String> searchList) {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AddressResolver.class);
+
+ /** Set of all locally bound IP addresses. */
+ static final Set<String> LOCAL_ADDRESSES;
+
+ private static final boolean IP_V4_ONLY;
+
+ static {
+ try {
+ LOCAL_ADDRESSES =
+ networkInterfaces()
+ .flatMap(
+ ni ->
+ ni.getInterfaceAddresses().stream()
+ // Need to do this InetAddress->byte[]->InetAddress
dance to get rid of
+ // host-address suffixes as in `0:0:0:0:0:0:0:1%lo`
+ .map(InterfaceAddress::getAddress)
+ .map(InetAddress::getAddress)
+ .map(
+ a -> {
+ try {
+ return InetAddress.getByAddress(a);
+ } catch (UnknownHostException e) {
+ // Should never happen when calling
getByAddress() with an IPv4 or
+ // IPv6 address
+ throw new RuntimeException(e);
+ }
+ })
+ .map(InetAddress::getHostAddress))
+ .collect(toUnmodifiableSet());
+
+ IP_V4_ONLY =
Boolean.parseBoolean(System.getProperty("java.net.preferIPv4Stack", "false"));
+ } catch (SocketException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Uses a "default" {@link DnsClient} using the first {@code nameserver} and
the {@code search}
+ * list configured in {@code /etc/resolv.conf}.
+ */
+ AddressResolver(Vertx vertx, long queryTimeoutMillis) {
+ this(createDnsClient(vertx, queryTimeoutMillis),
ResolvConf.system().getSearchList());
+ }
+
+ /**
+ * Creates a "default" {@link DnsClient} using the first nameserver
configured in {@code
+ * /etc/resolv.conf}.
+ */
+ private static DnsClient createDnsClient(Vertx vertx, long
queryTimeoutMillis) {
+ var nameservers = ResolvConf.system().getNameservers();
+ checkState(!nameservers.isEmpty(), "No nameserver configured in
/etc/resolv.conf");
+ var nameserver = nameservers.getFirst();
+ LOGGER.info(
+ "Using nameserver {}/{} with search list {}",
+ nameserver.getHostName(),
+ nameserver.getAddress().getHostAddress(),
+ ResolvConf.system().getSearchList());
+ return vertx.createDnsClient(
+ new DnsClientOptions()
+ // 5 seconds should be enough to resolve
+ .setQueryTimeout(queryTimeoutMillis)
+ .setHost(nameserver.getAddress().getHostAddress())
+ .setPort(nameserver.getPort()));
+ }
+
+ private Future<List<String>> resolveSingle(String name) {
+ var resultA = dnsClient.resolveA(name);
+ if (IP_V4_ONLY) {
+ return resultA;
+ }
+ return resultA.compose(
+ a ->
+ dnsClient
+ .resolveAAAA(name)
+ .map(aaaa -> Stream.concat(aaaa.stream(),
a.stream()).collect(toList())));
+ }
+
+ /** Resolve a single name, used by {@link #resolveAll(List)}. */
+ Future<List<String>> resolve(String name) {
+ if (name.startsWith("=")) {
+ return Future.succeededFuture(List.of(name.substring(1)));
+ }
+
+ // By convention, do not consult the 'search' list, when the name to query
ends with a dot.
+ var exact = name.endsWith(".");
+ var query = exact ? name.substring(0, name.length() - 1) : name;
+ var future = resolveSingle(query);
+ if (!exact) {
+ // Consult the 'search' list if the above 'resolveName' fails.
+ for (var search : searchList) {
+ future = future.recover(t -> resolveSingle(query + '.' + search));
+ }
+ }
+
+ return future;
+ }
+
+ /** Resolve all names in parallel. */
+ Future<List<String>> resolveAll(List<String> names) {
+ var composite =
Future.all(names.stream().map(this::resolve).collect(toList()));
+ return composite.map(
+ c ->
+ IntStream.range(0, c.size())
+ .mapToObj(c::resultAt)
+ .map(
+ e -> {
+ @SuppressWarnings("unchecked")
+ var casted = (List<String>) e;
+ return casted.stream();
+ })
+ .reduce(Stream::concat)
+ .map(s -> s.collect(toList()))
+ .orElse(List.of()));
+ }
+}
diff --git
a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationInfra.java
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationInfra.java
new file mode 100644
index 000000000..e2d0f1b90
--- /dev/null
+++
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationInfra.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.quarkus.distcache;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Produces;
+import java.util.UUID;
+
+@ApplicationScoped
+class CacheInvalidationInfra {
+
+ /** Produces a random, ephemeral server instance ID. */
+ @Produces
+ @ApplicationScoped
+ ServerInstanceId ephemeralServerInstanceId() {
+ return ServerInstanceId.of(UUID.randomUUID().toString());
+ }
+}
diff --git
a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationReceiver.java
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationReceiver.java
new file mode 100644
index 000000000..d106043fc
--- /dev/null
+++
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationReceiver.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.quarkus.distcache;
+
+import static
com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static java.util.Collections.emptyList;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.quarkus.vertx.http.ManagementInterface;
+import io.vertx.ext.web.RoutingContext;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.event.Observes;
+import jakarta.inject.Inject;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+import org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations;
+import
org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictObj;
+import
org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictReference;
+import
org.apache.polaris.persistence.nosql.api.cache.DistributedCacheInvalidation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Receiver for distributed cache invalidation messages.
+ *
+ * <p>This bean is automatically initialized via the {@code @Observes}
annotation of {@link
+ * #registerManagementRoutes(ManagementInterface)}. See <a
+ *
href="https://quarkus.io/guides/management-interface-reference#management-endpoint-application">Quarkus
+ * docs on management endpoint applications</a>.
+ */
+@ApplicationScoped
+class CacheInvalidationReceiver {
+ static final String CACHE_INVALIDATION_TOKEN_HEADER =
"Polaris-Cache-Invalidation-Token";
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CacheInvalidationReceiver.class);
+
+ private final DistributedCacheInvalidation distributedCacheInvalidation;
+ private final String serverInstanceId;
+ private final Set<String> validTokens;
+ private final String invalidationPath;
+ private final ObjectMapper objectMapper;
+
+ @SuppressWarnings("CdiInjectionPointsInspection")
+ @Inject
+ CacheInvalidationReceiver(
+ QuarkusDistributedCacheInvalidationsConfig storeConfig,
+ ServerInstanceId serverInstanceId,
+ DistributedCacheInvalidation.Receiver distributedCacheInvalidation) {
+ this.distributedCacheInvalidation = distributedCacheInvalidation;
+ this.serverInstanceId = serverInstanceId.instanceId();
+ this.invalidationPath = storeConfig.cacheInvalidationUri();
+ this.validTokens =
+ new
HashSet<>(storeConfig.cacheInvalidationValidTokens().orElse(emptyList()));
+ this.objectMapper =
+ new ObjectMapper()
+ // forward compatibility
+ .disable(FAIL_ON_UNKNOWN_PROPERTIES);
+ }
+
+ void registerManagementRoutes(@Observes ManagementInterface mi) {
+ mi.router().post(invalidationPath).handler(this::cacheInvalidations);
+ LOGGER.info("Registered cache invalidation management endpoint {}",
invalidationPath);
+ }
+
+ void cacheInvalidations(RoutingContext rc) {
+ var request = rc.request();
+ var senderId = request.getParam("sender");
+ var token = request.getHeader(CACHE_INVALIDATION_TOKEN_HEADER);
+
+ cacheInvalidations(
+ rc,
+ () -> {
+ try {
+ var json = rc.body().asString();
+ if (json == null || json.isEmpty()) {
+ return CacheInvalidations.cacheInvalidations(emptyList());
+ }
+ return objectMapper.readValue(json, CacheInvalidations.class);
+ } catch (Exception e) {
+ LOGGER.error("Failed to deserialize cache invalidation", e);
+ return CacheInvalidations.cacheInvalidations(emptyList());
+ }
+ },
+ senderId,
+ token);
+ }
+
+ void cacheInvalidations(
+ RoutingContext rc,
+ Supplier<CacheInvalidations> invalidations,
+ String senderId,
+ String token) {
+ if (token == null || !validTokens.contains(token)) {
+ LOGGER.warn("Received cache invalidation with invalid token {}", token);
+ responseInvalidToken(rc);
+ return;
+ }
+ if (serverInstanceId.equals(senderId)) {
+ LOGGER.trace("Ignoring invalidations from local instance");
+ responseNoContent(rc);
+ return;
+ }
+ if (!"application/json".equals(rc.request().getHeader("Content-Type"))) {
+ LOGGER.warn("Received cache invalidation with invalid HTTP content
type");
+ responseInvalidContentType(rc);
+ return;
+ }
+
+ List<CacheInvalidations.CacheInvalidation> invalidationList;
+ try {
+ invalidationList = invalidations.get().invalidations();
+ } catch (RuntimeException e) {
+ responseServerError(rc);
+ return;
+ }
+
+ var cacheInvalidation = distributedCacheInvalidation;
+ if (cacheInvalidation != null) {
+ for (CacheInvalidations.CacheInvalidation invalidation :
invalidationList) {
+ switch (invalidation.type()) {
+ case CacheInvalidationEvictObj.TYPE -> {
+ var putObj = (CacheInvalidationEvictObj) invalidation;
+ cacheInvalidation.evictObj(putObj.realmId(), putObj.id());
+ }
+ case CacheInvalidationEvictReference.TYPE -> {
+ var putReference = (CacheInvalidationEvictReference) invalidation;
+ cacheInvalidation.evictReference(putReference.realmId(),
putReference.ref());
+ }
+ default -> {
+ // nothing we can do about a new invalidation type here
+ }
+ }
+ }
+ }
+
+ responseNoContent(rc);
+ }
+
+ private void responseServerError(RoutingContext rc) {
+ rc.response().setStatusCode(500).setStatusMessage("Server error parsing
request body").end();
+ }
+
+ private void responseInvalidToken(RoutingContext rc) {
+ rc.response().setStatusCode(400).setStatusMessage("Invalid token").end();
+ }
+
+ private void responseInvalidContentType(RoutingContext rc) {
+ rc.response().setStatusCode(415).setStatusMessage("Unsupported media
type").end();
+ }
+
+ private void responseNoContent(RoutingContext rc) {
+ rc.response().setStatusCode(204).setStatusMessage("No content").end();
+ }
+}
diff --git
a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationSender.java
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationSender.java
new file mode 100644
index 000000000..b23731712
--- /dev/null
+++
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationSender.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.quarkus.distcache;
+
+import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
+import static java.util.Collections.emptyList;
+import static
org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictObj.cacheInvalidationEvictObj;
+import static
org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictReference.cacheInvalidationEvictReference;
+import static
org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.cacheInvalidations;
+import static
org.apache.polaris.persistence.nosql.quarkus.distcache.AddressResolver.LOCAL_ADDRESSES;
+import static
org.apache.polaris.persistence.nosql.quarkus.distcache.CacheInvalidationReceiver.CACHE_INVALIDATION_TOKEN_HEADER;
+import static
org.apache.polaris.persistence.nosql.quarkus.distcache.QuarkusDistributedCacheInvalidationsConfig.CACHE_INVALIDATIONS_CONFIG_PREFIX;
+import static
org.apache.polaris.persistence.nosql.quarkus.distcache.QuarkusDistributedCacheInvalidationsConfig.CONFIG_SERVICE_NAMES;
+import static
org.apache.polaris.persistence.nosql.quarkus.distcache.QuarkusDistributedCacheInvalidationsConfig.CONFIG_VALID_TOKENS;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.quarkus.runtime.Startup;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.http.HttpClientResponse;
+import io.vertx.core.http.HttpMethod;
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import
org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidation;
+import
org.apache.polaris.persistence.nosql.api.cache.DistributedCacheInvalidation;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sender for distributed cache invalidation messages.
+ *
+ * <p>This bean is automatically initialized via the {@code @Observes}
annotation, and automatically
+ * injected via the implemented {@link DistributedCacheInvalidation.Sender}
interface.
+ */
+@ApplicationScoped
+@Startup
+class CacheInvalidationSender implements DistributedCacheInvalidation.Sender {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CacheInvalidationSender.class);
+
+ private final Vertx vertx;
+ private final long serviceNameLookupIntervalMillis;
+
+ private final HttpClient httpClient;
+ private final AddressResolver addressResolver;
+
+ private final List<String> serviceNames;
+ private final int httpPort;
+ private final String invalidationUri;
+ private final long requestTimeout;
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ private final Lock lock = new ReentrantLock();
+ private final int batchSize;
+ private final BlockingQueue<CacheInvalidation> invalidations = new
LinkedBlockingQueue<>();
+ private boolean triggered;
+ private final String token;
+
+ /** Contains the IPv4/6 addresses resolved from {@link #serviceNames}. */
+ private volatile List<String> resolvedAddresses = emptyList();
+
+ @Inject
+ CacheInvalidationSender(
+ @SuppressWarnings("CdiInjectionPointsInspection") Vertx vertx,
+ QuarkusDistributedCacheInvalidationsConfig config,
+ @ConfigProperty(name = "quarkus.management.port") int httpPort,
+ ServerInstanceId serverInstanceId) {
+ this.vertx = vertx;
+
+ this.addressResolver = new AddressResolver(vertx,
config.dnsQueryTimeout().toMillis());
+ this.requestTimeout =
+ config
+ .cacheInvalidationRequestTimeout()
+ .orElse(Duration.of(30, ChronoUnit.SECONDS))
+ .toMillis();
+ this.httpClient = vertx.createHttpClient();
+ this.serviceNames =
config.cacheInvalidationServiceNames().orElse(emptyList());
+ this.httpPort = httpPort;
+ this.invalidationUri =
+ config.cacheInvalidationUri() + "?sender=" +
serverInstanceId.instanceId();
+ this.serviceNameLookupIntervalMillis =
+ config.cacheInvalidationServiceNameLookupInterval().toMillis();
+ this.batchSize = config.cacheInvalidationBatchSize();
+ this.token =
config.cacheInvalidationValidTokens().map(List::getFirst).orElse(null);
+ if (!serviceNames.isEmpty()) {
+ try {
+ LOGGER.info("Sending remote cache invalidations to service name(s)
{}", serviceNames);
+ // Wait for the initial name service resolution to complete.
+ updateServiceNames().toCompletionStage().toCompletableFuture().get();
+ if (config.cacheInvalidationValidTokens().isEmpty()) {
+ LOGGER.warn(
+ "No token configured for cache invalidation messages - will not
send any invalidation message. You need to configure the token(s) via {}.{}",
+ CACHE_INVALIDATIONS_CONFIG_PREFIX,
+ CONFIG_VALID_TOKENS);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to resolve service names " + serviceNames + " for remote
cache invalidations",
+ (e instanceof ExecutionException) ? e.getCause() : e);
+ }
+ } else if (token != null) {
+ LOGGER.warn(
+ "No service names are configured to send cache invalidation messages
to - will not send any invalidation message. You need to configure the service
name(s) via {}.{}",
+ CACHE_INVALIDATIONS_CONFIG_PREFIX,
+ CONFIG_SERVICE_NAMES);
+ }
+ }
+
+ private Future<List<String>> updateServiceNames() {
+ var previous = new HashSet<>(resolvedAddresses);
+ return resolveServiceNames(serviceNames)
+ .map(all -> all.stream().filter(adr ->
!LOCAL_ADDRESSES.contains(adr)).toList())
+ .onSuccess(
+ all -> {
+ // refresh addresses regularly
+ scheduleServiceNameResolution();
+
+ var resolved = new HashSet<>(all);
+ if (!resolved.equals(previous)) {
+ LOGGER.info(
+ "Service names for remote cache invalidations {} now
resolve to {}",
+ serviceNames,
+ all);
+ }
+
+ updateResolvedAddresses(all);
+ })
+ .onFailure(
+ t -> {
+ // refresh addresses regularly
+ scheduleServiceNameResolution();
+
+ LOGGER.warn("Failed to resolve service names: {}", t.toString());
+ });
+ }
+
+ @VisibleForTesting
+ void updateResolvedAddresses(List<String> all) {
+ resolvedAddresses = all;
+ }
+
+ private void scheduleServiceNameResolution() {
+ vertx.setTimer(serviceNameLookupIntervalMillis, x -> updateServiceNames());
+ }
+
+ @VisibleForTesting
+ Future<List<String>> resolveServiceNames(List<String> serviceNames) {
+ return addressResolver.resolveAll(serviceNames);
+ }
+
+ void enqueue(CacheInvalidation invalidation) {
+ if (serviceNames.isEmpty() || token == null) {
+ // Don't do anything if there are no targets to send invalidations to or
whether no token has
+ // been configured.
+ return;
+ }
+
+ lock.lock();
+ try {
+ invalidations.add(invalidation);
+
+ if (!triggered) {
+ LOGGER.trace("Triggered invalidation submission");
+ vertx.executeBlocking(this::sendInvalidations);
+ triggered = true;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private Void sendInvalidations() {
+ var batch = new ArrayList<CacheInvalidation>(batchSize);
+ try {
+ while (true) {
+ lock.lock();
+ try {
+ invalidations.drainTo(batch, 100);
+ if (batch.isEmpty()) {
+ LOGGER.trace("Done sending invalidations");
+ triggered = false;
+ break;
+ }
+ } finally {
+ lock.unlock();
+ }
+ submit(batch, resolvedAddresses);
+ batch = new ArrayList<>(batchSize);
+ }
+ } finally {
+ // Handle the very unlikely case that the call to submit() failed and we
cannot be sure that
+ // the current batch was submitted.
+ if (!batch.isEmpty()) {
+ lock.lock();
+ try {
+ invalidations.addAll(batch);
+ triggered = false;
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ return null;
+ }
+
+ @VisibleForTesting
+ List<Future<Map.Entry<HttpClientResponse, Buffer>>> submit(
+ List<CacheInvalidation> batch, List<String> resolvedAddresses) {
+ LOGGER.trace("Submitting {} invalidations", batch.size());
+
+ String json;
+ try {
+ json = objectMapper.writeValueAsString(cacheInvalidations(batch));
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+
+ var futures =
+ new ArrayList<Future<Map.Entry<HttpClientResponse,
Buffer>>>(resolvedAddresses.size());
+ for (var address : resolvedAddresses) {
+ futures.add(
+ httpClient
+ .request(HttpMethod.POST, httpPort, address, invalidationUri)
+ .compose(
+ req ->
+ req.putHeader("Content-Type", APPLICATION_JSON)
+ .putHeader(CACHE_INVALIDATION_TOKEN_HEADER, token)
+ .send(json))
+ .compose(resp -> resp.body().map(b -> Map.entry(resp, b)))
+ .timeout(requestTimeout, TimeUnit.MILLISECONDS)
+ .onComplete(
+ success -> {
+ var resp = success.getKey();
+ var statusCode = resp.statusCode();
+ if (statusCode != 200 && statusCode != 204) {
+ LOGGER.warn(
+ "{} cache invalidations could not be sent to {}:{}{}
- HTTP {}/{} - body: {}",
+ batch.size(),
+ address,
+ httpPort,
+ invalidationUri,
+ statusCode,
+ resp.statusMessage(),
+ success.getValue());
+ } else {
+ LOGGER.trace(
+ "{} cache invalidations sent to {}:{}",
batch.size(), address, httpPort);
+ }
+ },
+ failure -> {
+ if (failure instanceof SocketException
+ || failure instanceof UnknownHostException) {
+ LOGGER.warn(
+ "Technical network issue sending cache invalidations
to {}:{}{} : {}",
+ address,
+ httpPort,
+ invalidationUri,
+ failure.getMessage());
+ } else {
+ LOGGER.error(
+ "Technical failure sending cache invalidations to
{}:{}{}",
+ address,
+ httpPort,
+ invalidationUri,
+ failure);
+ }
+ }));
+ }
+ return futures;
+ }
+
+ @Override
+ public void evictReference(@Nonnull String repositoryId, @Nonnull String
refName) {
+ enqueue(cacheInvalidationEvictReference(repositoryId, refName));
+ }
+
+ @Override
+ public void evictObj(@Nonnull String repositoryId, @Nonnull ObjRef objId) {
+ enqueue(cacheInvalidationEvictObj(repositoryId, objId));
+ }
+}
diff --git
a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/QuarkusDistributedCacheInvalidationsConfig.java
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/QuarkusDistributedCacheInvalidationsConfig.java
new file mode 100644
index 000000000..643e44cb6
--- /dev/null
+++
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/QuarkusDistributedCacheInvalidationsConfig.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.quarkus.distcache;
+
+import static
org.apache.polaris.persistence.nosql.quarkus.distcache.QuarkusDistributedCacheInvalidationsConfig.CACHE_INVALIDATIONS_CONFIG_PREFIX;
+
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import io.smallrye.config.WithName;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+@PolarisImmutable
+@ConfigMapping(prefix = CACHE_INVALIDATIONS_CONFIG_PREFIX)
+public interface QuarkusDistributedCacheInvalidationsConfig {
+
+ String CACHE_INVALIDATIONS_CONFIG_PREFIX =
"polaris.persistence.distributed-cache-invalidations";
+ String CONFIG_VALID_TOKENS = "valid-tokens";
+ String CONFIG_SERVICE_NAMES = "service-names";
+ String CONFIG_URI = "uri";
+ String CONFIG_BATCH_SIZE = "batch-size";
+ String CONFIG_SERVICE_NAME_LOOKUP_INTERVAL = "service-name-lookup-interval";
+ String CONFIG_REQUEST_TIMEOUT = "request-timeout";
+ String CONFIG_DNS_QUERY_TIMEOUT = "dns.query-timeout";
+
+ /**
+ * Host names or IP addresses or kubernetes headless-service name of all
Polaris server instances
+ * accessing the same repository.
+ *
+ * <p>This value is automatically configured via the Polaris Helm chart,
additional configuration
+ * is not required.
+ *
+ * <p>If you have your own Helm chart or custom deployment, make sure to
configure the IPs of all
+ * Polaris instances here.
+ *
+ * <p>Names that start with an equal sign are not resolved but used "as is".
+ */
+ @WithName(CONFIG_SERVICE_NAMES)
+ Optional<List<String>> cacheInvalidationServiceNames();
+
+ /**
+ * List of cache-invalidation tokens to authenticate incoming
cache-invalidation messages.
+ *
+ * <p>The first token is used in outgoing cache-invalidation messages.
+ */
+ @WithName(CONFIG_VALID_TOKENS)
+ Optional<List<String>> cacheInvalidationValidTokens();
+
+ /**
+ * URI of the cache-invalidation endpoint, only available on the Quarkus
management port, defaults
+ * to 9000.
+ */
+ @WithName(CONFIG_URI)
+ @WithDefault("/polaris-management/cache-coherency")
+ String cacheInvalidationUri();
+
+ /**
+ * Interval of service-name lookups to resolve the {@linkplain
#cacheInvalidationServiceNames()
+ * service names} into IP addresses.
+ */
+ @WithName(CONFIG_SERVICE_NAME_LOOKUP_INTERVAL)
+ @WithDefault("PT10S")
+ Duration cacheInvalidationServiceNameLookupInterval();
+
+ /** Maximum number of cache-invalidation messages to send in a single
request to peer nodes. */
+ @WithName(CONFIG_BATCH_SIZE)
+ @WithDefault("20")
+ int cacheInvalidationBatchSize();
+
+ /**
+ * Request timeout for sent cache-invalidation messages. Timeouts trigger a
warning or error
+ * message.
+ */
+ @WithName(CONFIG_REQUEST_TIMEOUT)
+ Optional<Duration> cacheInvalidationRequestTimeout();
+
+ /** Timeout for DNS queries to resolve peer nodes. */
+ @WithName(CONFIG_DNS_QUERY_TIMEOUT)
+ @WithDefault("PT5S")
+ Duration dnsQueryTimeout();
+}
diff --git
a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ResolvConf.java
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ResolvConf.java
new file mode 100644
index 000000000..690009719
--- /dev/null
+++
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ResolvConf.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.quarkus.distcache;
+
+// Code mostly copied from io.netty.resolver.dns.ResolvConf, but with the
addition to extract
+// the 'search' option values.
+//
+// Marker for Polaris LICENSE file - keep it
+// CODE_COPIED_TO_POLARIS
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.unmodifiableList;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Looks up the {@code nameserver}s and {@code search} domains from the {@code
/etc/resolv.conf}
+ * file, intended for Linux and macOS only.
+ *
+ * <p>Extracting the {@code nameserver}s and {@code search} domains is
necessary to correctly
+ * resolve names in k8s environments.
+ */
+final class ResolvConf {
+ private final List<InetSocketAddress> nameservers;
+ private final List<String> searchList;
+
+ /**
+ * Reads from the given reader and extracts the {@code nameserver}s and
{@code search} domains
+ * using the syntax of the {@code /etc/resolv.conf} file, see {@code man
resolv.conf}.
+ *
+ * @param reader contents of {@code resolv.conf} are read from this {@link
BufferedReader}, up to
+ * the caller to close it
+ */
+ static ResolvConf fromReader(BufferedReader reader) throws IOException {
+ return new ResolvConf(reader);
+ }
+
+ /**
+ * Reads the given file and extracts the {@code nameserver}s and {@code
search} domains using the
+ * syntax of the {@code /etc/resolv.conf} file, see {@code man resolv.conf}.
+ */
+ static ResolvConf fromFile(String file) throws IOException {
+ try (var fileReader = new FileReader(file, UTF_8);
+ BufferedReader reader = new BufferedReader(fileReader)) {
+ return fromReader(reader);
+ }
+ }
+
+ /**
+ * Returns the {@code nameserver}s and {@code search} domains from the
{@code /etc/resolv.conf}
+ * file. The file is only read once during the lifetime of this class.
+ */
+ static ResolvConf system() {
+ var resolvConv = ResolvConfLazy.machineResolvConf;
+ if (resolvConv != null) {
+ return resolvConv;
+ }
+ throw new IllegalStateException("/etc/resolv.conf could not be read");
+ }
+
+ private ResolvConf(BufferedReader reader) throws IOException {
+ var nameservers = new ArrayList<InetSocketAddress>();
+ var searchList = new ArrayList<String>();
+ String ln;
+ while ((ln = reader.readLine()) != null) {
+ ln = ln.trim();
+ if (ln.isEmpty()) {
+ continue;
+ }
+
+ if (ln.startsWith("nameserver")) {
+ ln = ln.substring("nameserver".length()).trim();
+ nameservers.add(new InetSocketAddress(ln, 53));
+ }
+ if (ln.startsWith("search")) {
+ ln = ln.substring("search".length()).trim();
+ searchList.addAll(Arrays.asList(ln.split(" ")));
+ }
+ }
+ this.nameservers = unmodifiableList(nameservers);
+ this.searchList = unmodifiableList(searchList);
+ }
+
+ List<InetSocketAddress> getNameservers() {
+ return nameservers;
+ }
+
+ List<String> getSearchList() {
+ return searchList;
+ }
+
+ private static final class ResolvConfLazy {
+ static final ResolvConf machineResolvConf;
+
+ static {
+ ResolvConf resolvConf;
+ try {
+ resolvConf = ResolvConf.fromFile("/etc/resolv.conf");
+ } catch (IOException e) {
+ resolvConf = null;
+ }
+ machineResolvConf = resolvConf;
+ }
+ }
+}
diff --git
a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ServerInstanceId.java
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ServerInstanceId.java
new file mode 100644
index 000000000..f39c0540f
--- /dev/null
+++
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ServerInstanceId.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.quarkus.distcache;
+
+import org.apache.polaris.immutables.PolarisImmutable;
+
+@PolarisImmutable
+interface ServerInstanceId {
+ String instanceId();
+
+ static ServerInstanceId of(String instanceId) {
+ return ImmutableServerInstanceId.builder().instanceId(instanceId).build();
+ }
+}
diff --git
a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/package-info.java
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/package-info.java
new file mode 100644
index 000000000..f10e51718
--- /dev/null
+++
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/package-info.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Distributed cache invalidation for the NoSQL cache.
+ *
+ * <p>Provides both a receiver and a sender for {@link
+ * org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations cache
invalidation messages}.
+ *
+ * <p>The receiver registers a route on the Quarkus management interface.
Senders emit asynchronous
+ * requests to the cache-invalidation management endpoint.
+ *
+ * <p>Each Polaris instance submits its ephemeral instance-ID in the
invalidation messages to
+ * prevent the risk of processing loopback messages as a safety net.
+ *
+ * <p>All polaris instances share a common token (dynamically generated via
helm chart mechanisms)
+ * to protect against externally injected, malicious invalidation messages.
+ */
+package org.apache.polaris.persistence.nosql.quarkus.distcache;
diff --git
a/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/HttpTestServer.java
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/HttpTestServer.java
new file mode 100644
index 000000000..c89a44545
--- /dev/null
+++
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/HttpTestServer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.quarkus.distcache;
+
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+
+/** HTTP test server. */
+public class HttpTestServer implements AutoCloseable {
+ private final HttpServer server;
+
+ public HttpTestServer(String context, HttpHandler handler) throws
IOException {
+ this(new InetSocketAddress("localhost", 0), context, handler);
+ }
+
+ public HttpTestServer(InetSocketAddress bind, String context, HttpHandler
handler)
+ throws IOException {
+ HttpHandler safeHandler =
+ exchange -> {
+ try {
+ handler.handle(exchange);
+ } catch (RuntimeException | Error e) {
+ exchange.sendResponseHeaders(503, 0);
+ throw e;
+ }
+ };
+ server = HttpServer.create(bind, 0);
+ server.createContext(context, safeHandler);
+ server.setExecutor(null);
+
+ server.start();
+ }
+
+ public InetSocketAddress getAddress() {
+ return server.getAddress();
+ }
+
+ public URI getUri() {
+ return URI.create(
+ "http://"
+ + getAddress().getAddress().getHostAddress()
+ + ":"
+ + getAddress().getPort()
+ + "/");
+ }
+
+ @Override
+ public void close() {
+ server.stop(0);
+ }
+}
diff --git
a/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestAddressResolver.java
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestAddressResolver.java
new file mode 100644
index 000000000..3e08793f0
--- /dev/null
+++
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestAddressResolver.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.quarkus.distcache;
+
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toList;
+import static
org.apache.polaris.persistence.nosql.quarkus.distcache.AddressResolver.LOCAL_ADDRESSES;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.dns.DnsException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(SoftAssertionsExtension.class)
+public class TestAddressResolver {
+ @InjectSoftAssertions protected SoftAssertions soft;
+
+ protected Vertx vertx;
+ AddressResolver addressResolver;
+
+ @BeforeEach
+ void setUp() {
+ vertx = Vertx.builder().build();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ try {
+ if (addressResolver != null) {
+ addressResolver
+ .dnsClient()
+ .close()
+ .toCompletionStage()
+ .toCompletableFuture()
+ .get(1, TimeUnit.MINUTES);
+ }
+ } finally {
+ try {
+ vertx.close().toCompletionStage().toCompletableFuture().get(1,
TimeUnit.MINUTES);
+ } finally {
+ vertx = null;
+ }
+ }
+ }
+
+ @Test
+ public void resolveNoName() throws Exception {
+ addressResolver = new AddressResolver(vertx, 5000);
+ soft.assertThat(
+ addressResolver
+ .resolveAll(Collections.emptyList())
+ .toCompletionStage()
+ .toCompletableFuture()
+ .get(1, TimeUnit.MINUTES))
+ .isEmpty();
+ }
+
+ @Test
+ public void resolveGoodName() throws Exception {
+ addressResolver = new AddressResolver(vertx, 5000);
+
+ AddressResolver addressResolverWithSearch =
+ new AddressResolver(addressResolver.dnsClient(), List.of("org"));
+
+ List<String> withoutSearchList =
+ addressResolver
+ .resolveAll(singletonList("projectnessie.org"))
+ .toCompletionStage()
+ .toCompletableFuture()
+ .get(1, TimeUnit.MINUTES);
+ soft.assertThat(withoutSearchList).isNotEmpty();
+
+ List<String> withSearchList1 =
+ addressResolverWithSearch
+ .resolveAll(singletonList("projectnessie.org"))
+ .toCompletionStage()
+ .toCompletableFuture()
+ .get(1, TimeUnit.MINUTES);
+ soft.assertThat(withoutSearchList).isNotEmpty();
+ soft.assertThat(withSearchList1).isNotEmpty().isNotEmpty();
+
soft.assertThat(withSearchList1).containsExactlyInAnyOrderElementsOf(withoutSearchList);
+
+ List<String> withSearchList2 =
+ addressResolverWithSearch
+ .resolveAll(singletonList("projectnessie"))
+ .toCompletionStage()
+ .toCompletableFuture()
+ .get(1, TimeUnit.MINUTES);
+ soft.assertThat(withSearchList2).isNotEmpty();
+
soft.assertThat(withSearchList2).containsExactlyInAnyOrderElementsOf(withoutSearchList);
+
+ List<String> withSearchListQualified =
+ addressResolverWithSearch
+ .resolveAll(singletonList("projectnessie.org."))
+ .toCompletionStage()
+ .toCompletableFuture()
+ .get(1, TimeUnit.MINUTES);
+ soft.assertThat(withSearchListQualified).isNotEmpty();
+
+
soft.assertThat(withSearchListQualified).containsExactlyInAnyOrderElementsOf(withoutSearchList);
+ }
+
+ @Test
+ @DisabledOnOs(value = OS.MAC, disabledReason = "Resolving 'localhost'
doesn't work on macOS")
+ public void resolveSingleName() throws Exception {
+ addressResolver = new AddressResolver(vertx, 5000);
+ soft.assertThat(
+ addressResolver
+ .resolveAll(singletonList("localhost"))
+ .toCompletionStage()
+ .toCompletableFuture()
+ .get(1, TimeUnit.MINUTES))
+ .isNotEmpty()
+ .containsAnyOf("0:0:0:0:0:0:0:1", "127.0.0.1");
+ }
+
+ @Test
+ public void resolveBadName() {
+ addressResolver = new AddressResolver(vertx, 5000);
+ soft.assertThat(
+ addressResolver
+
.resolveAll(singletonList("wepofkjeopiwkf.wepofkeowpkfpoew.weopfkewopfk.local"))
+ .toCompletionStage()
+ .toCompletableFuture())
+ .failsWithin(1, TimeUnit.MINUTES)
+ .withThrowableThat()
+ .withCauseInstanceOf(DnsException.class);
+ }
+
+ @Test
+ @DisabledOnOs(value = OS.MAC, disabledReason = "Resolving 'localhost'
doesn't work on macOS")
+ public void resolveFilterLocalAddresses() throws Exception {
+ addressResolver = new AddressResolver(vertx, 5000);
+ soft.assertThat(
+ addressResolver
+ .resolveAll(singletonList("localhost"))
+ .map(
+ s -> s.stream().filter(adr ->
!LOCAL_ADDRESSES.contains(adr)).collect(toList()))
+ .toCompletionStage()
+ .toCompletableFuture()
+ .get(1, TimeUnit.MINUTES))
+ .isEmpty();
+ }
+}
diff --git
a/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestCacheInvalidationReceiver.java
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestCacheInvalidationReceiver.java
new file mode 100644
index 000000000..e6e7d4073
--- /dev/null
+++
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestCacheInvalidationReceiver.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.quarkus.distcache;
+
+import static java.util.Collections.singletonList;
+import static
org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictObj.cacheInvalidationEvictObj;
+import static
org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictReference.cacheInvalidationEvictReference;
+import static
org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.cacheInvalidations;
+import static
org.apache.polaris.persistence.nosql.quarkus.distcache.CacheInvalidationReceiver.CACHE_INVALIDATION_TOKEN_HEADER;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.vertx.core.Future;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.http.HttpServerResponse;
+import io.vertx.ext.web.RequestBody;
+import io.vertx.ext.web.RoutingContext;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations;
+import
org.apache.polaris.persistence.nosql.api.cache.DistributedCacheInvalidation;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.junit.jupiter.api.Test;
+
+public class TestCacheInvalidationReceiver {
+ private static final ObjRef SOME_OBJ_REF = ObjRef.objRef("foo", 1234);
+
+ @Test
+ public void senderReceiver() throws Exception {
+ var distributedCacheInvalidation =
mock(DistributedCacheInvalidation.Receiver.class);
+
+ var token = "cafe";
+ var tokens = singletonList(token);
+ var receiverId = ServerInstanceId.of("receiverId");
+ var senderId = ServerInstanceId.of("senderId");
+
+ var receiver = buildReceiver(tokens, receiverId,
distributedCacheInvalidation);
+
+ var invalidations = cacheInvalidations(allInvalidationTypes());
+
+ var rc =
+ expectResponse(
+ r -> {
+ when(r.getParam("sender")).thenReturn(senderId.instanceId());
+
when(r.getHeader(CACHE_INVALIDATION_TOKEN_HEADER)).thenReturn(token);
+ });
+ var reqBody = mock(RequestBody.class);
+ when(reqBody.asString()).thenReturn(new
ObjectMapper().writeValueAsString(invalidations));
+ when(rc.body()).thenReturn(reqBody);
+
+ receiver.cacheInvalidations(rc);
+
+ verify(rc.response()).setStatusCode(204);
+ verify(rc.response()).setStatusMessage("No content");
+
+ verify(distributedCacheInvalidation).evictObj("repo", SOME_OBJ_REF);
+ verify(distributedCacheInvalidation).evictReference("repo",
"refs/foo/bar");
+ verifyNoMoreInteractions(distributedCacheInvalidation);
+ }
+
+ @Test
+ public void doesNotAcceptInvalidationsWithoutTokens() {
+ var distributedCacheInvalidation =
mock(DistributedCacheInvalidation.Receiver.class);
+
+ var token = "cafe";
+ var tokens = List.<String>of();
+ var receiverId = ServerInstanceId.of("receiverId");
+ var senderId = ServerInstanceId.of("senderId");
+
+ var receiver = buildReceiver(tokens, receiverId,
distributedCacheInvalidation);
+
+ var rc = expectResponse();
+ receiver.cacheInvalidations(
+ rc, () -> cacheInvalidations(allInvalidationTypes()),
senderId.instanceId(), token);
+
+ verify(rc.response()).setStatusCode(400);
+ verify(rc.response()).setStatusMessage("Invalid token");
+
+ verifyNoMoreInteractions(distributedCacheInvalidation);
+ }
+
+ @Test
+ public void receiveFromSelf() {
+ var distributedCacheInvalidation =
mock(DistributedCacheInvalidation.Receiver.class);
+
+ var token = "cafe";
+ var tokens = singletonList(token);
+ var receiverId = ServerInstanceId.of("receiverId");
+
+ var receiver = buildReceiver(tokens, receiverId,
distributedCacheInvalidation);
+
+ var rc = expectResponse();
+ receiver.cacheInvalidations(
+ rc, () -> cacheInvalidations(allInvalidationTypes()),
receiverId.instanceId(), token);
+
+ verify(rc.response()).setStatusCode(204);
+ verify(rc.response()).setStatusMessage("No content");
+
+ verifyNoMoreInteractions(distributedCacheInvalidation);
+ }
+
+ @Test
+ public void unknownToken() {
+ var distributedCacheInvalidation =
mock(DistributedCacheInvalidation.Receiver.class);
+
+ var token = "cafe";
+ var tokens = singletonList(token);
+ var differentToken = "otherToken";
+ var receiverId = ServerInstanceId.of("receiverId");
+ var senderId = ServerInstanceId.of("senderId");
+
+ CacheInvalidationReceiver receiver =
+ buildReceiver(tokens, receiverId, distributedCacheInvalidation);
+
+ RoutingContext rc = expectResponse();
+ receiver.cacheInvalidations(
+ rc,
+ () -> cacheInvalidations(allInvalidationTypes()),
+ senderId.instanceId(),
+ differentToken);
+
+ verify(rc.response()).setStatusCode(400);
+ verify(rc.response()).setStatusMessage("Invalid token");
+
+ verifyNoMoreInteractions(distributedCacheInvalidation);
+ }
+
+ private RoutingContext expectResponse() {
+ return expectResponse(r -> {});
+ }
+
+ private RoutingContext expectResponse(Consumer<HttpServerRequest>
requestMocker) {
+ var response = mock(HttpServerResponse.class);
+ when(response.setStatusCode(anyInt())).thenReturn(response);
+ when(response.setStatusMessage(anyString())).thenReturn(response);
+ when(response.end()).thenReturn(Future.succeededFuture());
+
+ var request = mock(HttpServerRequest.class);
+ when(request.getHeader("Content-Type")).thenReturn("application/json");
+ requestMocker.accept(request);
+
+ var rc = mock(RoutingContext.class);
+ when(rc.response()).thenReturn(response);
+ when(rc.request()).thenReturn(request);
+ return rc;
+ }
+
+ private static CacheInvalidationReceiver buildReceiver(
+ List<String> tokens,
+ ServerInstanceId receiverId,
+ DistributedCacheInvalidation.Receiver distCacheInvalidation) {
+ QuarkusDistributedCacheInvalidationsConfig config =
+ mock(QuarkusDistributedCacheInvalidationsConfig.class);
+
when(config.cacheInvalidationValidTokens()).thenReturn(Optional.of(tokens));
+
+ return new CacheInvalidationReceiver(config, receiverId,
distCacheInvalidation);
+ }
+
+ List<CacheInvalidations.CacheInvalidation> allInvalidationTypes() {
+ return List.of(
+ cacheInvalidationEvictReference("repo", "refs/foo/bar"),
+ cacheInvalidationEvictObj("repo", SOME_OBJ_REF));
+ }
+}
diff --git
a/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestCacheInvalidationSender.java
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestCacheInvalidationSender.java
new file mode 100644
index 000000000..098174710
--- /dev/null
+++
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestCacheInvalidationSender.java
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.quarkus.distcache;
+
+import static io.vertx.core.Future.failedFuture;
+import static io.vertx.core.Future.succeededFuture;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.singletonList;
+import static
org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictObj.cacheInvalidationEvictObj;
+import static
org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictReference.cacheInvalidationEvictReference;
+import static
org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.cacheInvalidations;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpClientResponse;
+import java.io.InputStream;
+import java.net.URI;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import
org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidation;
+import
org.apache.polaris.persistence.nosql.api.cache.DistributedCacheInvalidation;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+@ExtendWith(SoftAssertionsExtension.class)
+public class TestCacheInvalidationSender {
+ private static final ObjRef SOME_OBJ_REF = ObjRef.objRef("foo", 1234);
+
+ @InjectSoftAssertions protected SoftAssertions soft;
+
+ protected Vertx vertx;
+
+ @BeforeEach
+ void setUp() {
+ vertx = Vertx.builder().build();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ try {
+ vertx.close().toCompletionStage().toCompletableFuture().get(1,
TimeUnit.MINUTES);
+ } finally {
+ vertx = null;
+ }
+ }
+
+ @Test
+ public void serviceNameLookupFailure() {
+ var senderId = ServerInstanceId.of("senderId");
+
+ var token = "token";
+ var tokens = singletonList(token);
+
+ var config =
+ buildConfig(
+ tokens,
+ Optional.of(singletonList("serviceName")),
+ Duration.ofSeconds(10),
+ Duration.ofSeconds(10));
+
+ soft.assertThatThrownBy(
+ () ->
+ new CacheInvalidationSender(vertx, config, 80, senderId) {
+ @Override
+ Future<List<String>> resolveServiceNames(List<String>
serviceNames) {
+ return failedFuture(new RuntimeException("foo"));
+ }
+
+ @Override
+ List<Future<Map.Entry<HttpClientResponse, Buffer>>> submit(
+ List<CacheInvalidation> batch, List<String>
resolvedAddresses) {
+ soft.fail("Not expected");
+ return null;
+ }
+ })
+ .hasMessage("Failed to resolve service names [serviceName] for remote
cache invalidations")
+ .cause()
+ .hasMessage("foo");
+ }
+
+ @Test
+ public void regularServiceNameLookups() throws Exception {
+ var senderId = ServerInstanceId.of("senderId");
+
+ var token = "token";
+ var tokens = singletonList(token);
+
+ var config =
+ buildConfig(
+ tokens,
+ Optional.of(singletonList("serviceName")),
+ Duration.ofMillis(1),
+ Duration.ofSeconds(10));
+
+ var resolveSemaphore = new Semaphore(1);
+ var continueSemaphore = new Semaphore(0);
+ var submittedSemaphore = new Semaphore(0);
+ var updateResolvedSemaphore = new Semaphore(0);
+ var currentAddresses = List.of("127.1.1.1");
+ var resolveResult = new
AtomicReference<>(succeededFuture(currentAddresses));
+ var submitResolvedAddresses = new AtomicReference<List<String>>();
+
+ try {
+ CacheInvalidationSender sender =
+ new CacheInvalidationSender(vertx, config, 80, senderId) {
+ @Override
+ Future<List<String>> resolveServiceNames(List<String>
serviceNames) {
+ try {
+ assertThat(resolveSemaphore.tryAcquire(30,
TimeUnit.SECONDS)).isTrue();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ return resolveResult.get();
+ } finally {
+ continueSemaphore.release();
+ }
+ }
+
+ @Override
+ void updateResolvedAddresses(List<String> all) {
+ try {
+ super.updateResolvedAddresses(all);
+ } finally {
+ updateResolvedSemaphore.release();
+ }
+ }
+
+ @Override
+ List<Future<Map.Entry<HttpClientResponse, Buffer>>> submit(
+ List<CacheInvalidation> batch, List<String> resolvedAddresses)
{
+ submitResolvedAddresses.set(resolvedAddresses);
+ submittedSemaphore.release();
+ return null;
+ }
+ };
+
+ // "consume" after initial, blocking call to resolveServiceNames() from
the constructor
+ assertThat(continueSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue();
+ assertThat(updateResolvedSemaphore.tryAcquire(30,
TimeUnit.SECONDS)).isTrue();
+
+ // Send an invalidation, compare addresses
+ sender.evictObj("repo", SOME_OBJ_REF);
+ assertThat(submittedSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue();
+ soft.assertThat(submitResolvedAddresses.get())
+ .containsExactlyInAnyOrderElementsOf(currentAddresses);
+
+ // simulate change of resolved addresses
+ currentAddresses = List.of("127.2.2.2", "127.3.3.3");
+ resolveResult.set(succeededFuture(currentAddresses));
+ resolveSemaphore.release();
+ // wait until next call to resolveServiceNames() has been triggered
+ assertThat(continueSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue();
+ assertThat(updateResolvedSemaphore.tryAcquire(30,
TimeUnit.SECONDS)).isTrue();
+
+ // Send another invalidation, compare addresses
+ sender.evictObj("repo", SOME_OBJ_REF);
+ assertThat(submittedSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue();
+ soft.assertThat(submitResolvedAddresses.get())
+ .containsExactlyInAnyOrderElementsOf(currentAddresses);
+
+ // simulate a failure resolving the addresses
+ resolveResult.set(failedFuture(new RuntimeException("blah")));
+ resolveSemaphore.release();
+ // wait until next call to resolveServiceNames() has been triggered
+ assertThat(continueSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue();
+
+ // Send another invalidation, compare addresses
+ sender.evictObj("repo", SOME_OBJ_REF);
+ assertThat(submittedSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue();
+ soft.assertThat(submitResolvedAddresses.get())
+ .containsExactlyInAnyOrderElementsOf(currentAddresses);
+
+ // simulate another change of resolved addresses
+ currentAddresses = List.of("127.4.4.4", "127.5.5.5");
+ resolveResult.set(succeededFuture(currentAddresses));
+ resolveSemaphore.release();
+ // wait until next call to resolveServiceNames() has been triggered
+ assertThat(continueSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue();
+ assertThat(updateResolvedSemaphore.tryAcquire(30,
TimeUnit.SECONDS)).isTrue();
+
+ // Send another invalidation, compare addresses
+ sender.evictObj("repo", SOME_OBJ_REF);
+ assertThat(submittedSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue();
+ soft.assertThat(submitResolvedAddresses.get())
+ .containsExactlyInAnyOrderElementsOf(currentAddresses);
+ } finally {
+ // Permit a lot, the test might otherwise "hang" in resolveServiceNames()
+ resolveSemaphore.release(10_000_000);
+ }
+ }
+
+ @Test
+ public void noServiceNames() throws Exception {
+ var senderId = ServerInstanceId.of("senderId");
+
+ var token = "token";
+ var tokens = singletonList(token);
+
+ var config =
+ buildConfig(tokens, Optional.empty(), Duration.ofSeconds(10),
Duration.ofSeconds(10));
+
+ var sender =
+ new CacheInvalidationSender(vertx, config, 80, senderId) {
+ @Override
+ Future<List<String>> resolveServiceNames(List<String> serviceNames) {
+ return succeededFuture(List.of());
+ }
+
+ @Override
+ List<Future<Map.Entry<HttpClientResponse, Buffer>>> submit(
+ List<CacheInvalidation> batch, List<String> resolvedAddresses) {
+ soft.fail("Not expected");
+ return null;
+ }
+ };
+
+ var senderSpy = spy(sender);
+
+ senderSpy.evictObj("repo", SOME_OBJ_REF);
+
+ // Hard to test that nothing is done, if the list of resolved addresses is
empty, but the
+ // condition is easy. If this tests is flaky, then there's something
broken.
+ Thread.sleep(100L);
+
+ verify(senderSpy).evictObj("repo", SOME_OBJ_REF);
+ verify(senderSpy).enqueue(cacheInvalidationEvictObj("repo", SOME_OBJ_REF));
+ verifyNoMoreInteractions(senderSpy);
+ }
+
+ @ParameterizedTest
+ @MethodSource("invalidations")
+ public void mockedSendSingleInvalidation(
+ Consumer<DistributedCacheInvalidation> invalidation, CacheInvalidation
expected)
+ throws Exception {
+ var senderId = ServerInstanceId.of("senderId");
+
+ var token = "token";
+ var tokens = singletonList(token);
+
+ var serviceNames = singletonList("service-name");
+ var resolvedServiceNames = singletonList("service-name-resolved");
+
+ var config =
+ buildConfig(
+ tokens, Optional.of(serviceNames), Duration.ofSeconds(10),
Duration.ofSeconds(10));
+
+ var sem = new Semaphore(0);
+ var sender =
+ new CacheInvalidationSender(vertx, config, 80, senderId) {
+ @Override
+ Future<List<String>> resolveServiceNames(List<String> serviceNames) {
+ return succeededFuture(resolvedServiceNames);
+ }
+
+ @Override
+ List<Future<Map.Entry<HttpClientResponse, Buffer>>> submit(
+ List<CacheInvalidation> batch, List<String> resolvedAddresses) {
+ sem.release(1);
+ return null;
+ }
+ };
+
+ var senderSpy = spy(sender);
+
+ invalidation.accept(senderSpy);
+ assertThat(sem.tryAcquire(30, TimeUnit.SECONDS)).isTrue();
+
+ verify(senderSpy).submit(singletonList(expected), resolvedServiceNames);
+ }
+
+ @Test
+ public void mockedAllInvalidationTypes() throws Exception {
+ var senderId = ServerInstanceId.of("senderId");
+
+ var token = "token";
+ var tokens = singletonList(token);
+
+ var serviceNames = singletonList("service-name");
+ var resolvedServiceNames = singletonList("service-name-resolved");
+
+ var config =
+ buildConfig(
+ tokens, Optional.of(serviceNames), Duration.ofSeconds(10),
Duration.ofSeconds(10));
+
+ var sem = new Semaphore(0);
+ var received = new ConcurrentLinkedQueue<>();
+ var sender =
+ new CacheInvalidationSender(vertx, config, 80, senderId) {
+ @Override
+ Future<List<String>> resolveServiceNames(List<String> serviceNames) {
+ return succeededFuture(resolvedServiceNames);
+ }
+
+ @Override
+ List<Future<Map.Entry<HttpClientResponse, Buffer>>> submit(
+ List<CacheInvalidation> batch, List<String> resolvedAddresses) {
+ received.addAll(batch);
+ soft.assertThat(resolvedAddresses)
+ .containsExactlyInAnyOrderElementsOf(resolvedServiceNames);
+ sem.release(batch.size());
+ return null;
+ }
+ };
+
+ var senderSpy = spy(sender);
+
+ var expected =
+ invalidations().map(args ->
args.get()[1]).map(CacheInvalidation.class::cast).toList();
+
+ invalidations()
+ .map(args -> args.get()[0])
+ .map(
+ i -> {
+ @SuppressWarnings({"UnnecessaryLocalVariable", "unchecked"})
+ Consumer<DistributedCacheInvalidation> r =
(Consumer<DistributedCacheInvalidation>) i;
+ return r;
+ })
+ .forEach(i -> i.accept(senderSpy));
+
+ assertThat(sem.tryAcquire(expected.size(), 30, TimeUnit.SECONDS)).isTrue();
+
+ soft.assertThat(received).containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ @ParameterizedTest
+ @MethodSource("invalidations")
+ public void sendSingleInvalidation(
+ @SuppressWarnings("unused") Consumer<DistributedCacheInvalidation>
invalidation,
+ CacheInvalidation expected)
+ throws Exception {
+ var senderId = ServerInstanceId.of("senderId");
+
+ var token = "token";
+ var tokens = singletonList(token);
+
+ var serviceNames = singletonList("service-name");
+
+ var config =
+ buildConfig(
+ tokens, Optional.of(serviceNames), Duration.ofSeconds(10),
Duration.ofSeconds(10));
+
+ var mapper = new ObjectMapper();
+
+ var body = new AtomicReference<String>();
+ var reqUri = new AtomicReference<URI>();
+ try (var receiver =
+ new HttpTestServer(
+ config.cacheInvalidationUri(),
+ exchange -> {
+ try (InputStream requestBody = exchange.getRequestBody()) {
+ body.set(new String(requestBody.readAllBytes(), UTF_8));
+ }
+ reqUri.set(exchange.getRequestURI());
+ exchange.sendResponseHeaders(204, 0);
+ exchange.getResponseBody().close();
+ })) {
+
+ var uri = receiver.getUri();
+
+ var sender =
+ new CacheInvalidationSender(vertx, config, uri.getPort(), senderId) {
+ @Override
+ Future<List<String>> resolveServiceNames(List<String>
serviceNames) {
+ return succeededFuture(List.of(uri.getHost()));
+ }
+ };
+
+ var future =
+ CompletableFuture.allOf(
+ sender.submit(singletonList(expected),
singletonList(uri.getHost())).stream()
+ .map(Future::toCompletionStage)
+ .map(CompletionStage::toCompletableFuture)
+ .toArray(CompletableFuture[]::new));
+
+ soft.assertThat(future).succeedsWithin(30, TimeUnit.SECONDS);
+
+ soft.assertThat(body.get())
+
.isEqualTo(mapper.writeValueAsString(cacheInvalidations(singletonList(expected))));
+
soft.assertThat(reqUri.get()).extracting(URI::getPath).isEqualTo("/foo/bar/");
+ soft.assertThat(reqUri.get())
+ .extracting(URI::getQuery)
+ .isEqualTo("sender=" + senderId.instanceId());
+ }
+ }
+
+ @Test
+ public void allInvalidationTypes() throws Exception {
+ var senderId = ServerInstanceId.of("senderId");
+
+ var token = "token";
+ var tokens = singletonList(token);
+
+ var serviceNames = singletonList("service-name");
+
+ var config =
+ buildConfig(
+ tokens, Optional.of(serviceNames), Duration.ofSeconds(10),
Duration.ofSeconds(30));
+
+ var expected =
+ invalidations().map(args ->
args.get()[1]).map(CacheInvalidation.class::cast).toList();
+
+ var mapper = new ObjectMapper();
+
+ var body = new AtomicReference<String>();
+ var reqUri = new AtomicReference<URI>();
+ try (HttpTestServer receiver =
+ new HttpTestServer(
+ config.cacheInvalidationUri(),
+ exchange -> {
+ try (InputStream requestBody = exchange.getRequestBody()) {
+ body.set(new String(requestBody.readAllBytes(), UTF_8));
+ }
+ reqUri.set(exchange.getRequestURI());
+ exchange.sendResponseHeaders(204, 0);
+ exchange.getResponseBody().close();
+ })) {
+
+ var uri = receiver.getUri();
+
+ var sender =
+ new CacheInvalidationSender(vertx, config, uri.getPort(), senderId) {
+ @Override
+ Future<List<String>> resolveServiceNames(List<String>
serviceNames) {
+ return succeededFuture(List.of(uri.getHost()));
+ }
+ };
+
+ var future =
+ Future.all(sender.submit(expected, singletonList(uri.getHost())))
+ .toCompletionStage()
+ .toCompletableFuture();
+
+ soft.assertThat(future).succeedsWithin(30, TimeUnit.SECONDS);
+
+ soft.assertThat(body.get())
+ .isEqualTo(mapper.writeValueAsString(cacheInvalidations(expected)));
+
soft.assertThat(reqUri.get()).extracting(URI::getPath).isEqualTo("/foo/bar/");
+ soft.assertThat(reqUri.get())
+ .extracting(URI::getQuery)
+ .isEqualTo("sender=" + senderId.instanceId());
+ }
+ }
+
+ @Test
+ public void sendInvalidationTimeout() throws Exception {
+ var senderId = ServerInstanceId.of("senderId");
+
+ var token = "token";
+ var tokens = singletonList(token);
+
+ var serviceNames = singletonList("service-name");
+
+ var config =
+ buildConfig(
+ tokens, Optional.of(serviceNames), Duration.ofSeconds(10),
Duration.ofMillis(1));
+
+ var expected =
+ invalidations().map(args ->
args.get()[1]).map(CacheInvalidation.class::cast).toList();
+
+ try (var receiver =
+ new HttpTestServer(
+ config.cacheInvalidationUri(),
+ exchange -> {
+ try (InputStream requestBody = exchange.getRequestBody()) {
+ requestBody.readAllBytes();
+ }
+ // don't send a response -> provoke a timeout
+ exchange.getResponseBody().close();
+ })) {
+
+ var uri = receiver.getUri();
+
+ var sender =
+ new CacheInvalidationSender(vertx, config, uri.getPort(), senderId) {
+ @Override
+ Future<List<String>> resolveServiceNames(List<String>
serviceNames) {
+ return succeededFuture(List.of(uri.getHost()));
+ }
+ };
+
+ var future =
+ CompletableFuture.allOf(
+ sender.submit(expected, singletonList(uri.getHost())).stream()
+ .map(Future::toCompletionStage)
+ .map(CompletionStage::toCompletableFuture)
+ .toArray(CompletableFuture[]::new));
+
+ soft.assertThat(future)
+ .failsWithin(30, TimeUnit.SECONDS)
+ .withThrowableOfType(ExecutionException.class)
+ .withMessageContaining("Timeout 1 (ms) fired");
+ }
+ }
+
+ static Stream<Arguments> invalidations() {
+ return Stream.of(
+ arguments(
+ (Consumer<DistributedCacheInvalidation>) i -> i.evictObj("repo",
SOME_OBJ_REF),
+ cacheInvalidationEvictObj("repo", SOME_OBJ_REF)),
+ arguments(
+ (Consumer<DistributedCacheInvalidation>) i ->
i.evictReference("repo", "refs/foo/bar"),
+ cacheInvalidationEvictReference("repo", "refs/foo/bar")));
+ }
+
+ private static QuarkusDistributedCacheInvalidationsConfig buildConfig(
+ List<String> tokens,
+ Optional<List<String>> serviceName,
+ Duration interval,
+ Duration requestTimeout) {
+ var config = mock(QuarkusDistributedCacheInvalidationsConfig.class);
+
when(config.cacheInvalidationValidTokens()).thenReturn(Optional.of(tokens));
+ when(config.cacheInvalidationServiceNames()).thenReturn(serviceName);
+
when(config.cacheInvalidationServiceNameLookupInterval()).thenReturn(interval);
+ when(config.cacheInvalidationBatchSize()).thenReturn(10);
+ when(config.cacheInvalidationUri()).thenReturn("/foo/bar/");
+
when(config.cacheInvalidationRequestTimeout()).thenReturn(Optional.of(requestTimeout));
+ when(config.dnsQueryTimeout()).thenReturn(Duration.ofSeconds(5));
+ return config;
+ }
+}
diff --git
a/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestResolvConf.java
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestResolvConf.java
new file mode 100644
index 000000000..38a9555a6
--- /dev/null
+++
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestResolvConf.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.quarkus.distcache;
+
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Stream;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+@ExtendWith(SoftAssertionsExtension.class)
+public class TestResolvConf {
+ @InjectSoftAssertions protected SoftAssertions soft;
+
+ @ParameterizedTest
+ @MethodSource
+ public void resolve(
+ String resolvConfContent, List<InetSocketAddress> nameservers,
List<String> searchList)
+ throws Exception {
+ ResolvConf resolvConf =
+ ResolvConf.fromReader(new BufferedReader(new
StringReader(resolvConfContent)));
+ soft.assertThat(resolvConf)
+ .extracting(ResolvConf::getNameservers, ResolvConf::getSearchList)
+ .containsExactly(nameservers, searchList);
+ }
+
+ @Test
+ public void system() throws IOException {
+ String file = Files.readString(Paths.get("/etc/resolv.conf"));
+
+ ResolvConf resolvConf = ResolvConf.system();
+ soft.assertThat(resolvConf.getNameservers()).isNotEmpty();
+ // This 'if' ensures that this test passes on the macOS test run in CI.
+ if (file.contains("\nsearch ") || file.startsWith("search ")) {
+ soft.assertThat(resolvConf.getSearchList()).isNotEmpty();
+ } else {
+ soft.assertThat(resolvConf.getSearchList()).isEmpty();
+ }
+ }
+
+ static Stream<Arguments> resolve() {
+ return Stream.of(
+ arguments(
+ """
+ # See man:systemd-resolved.service(8) for details about the
supported modes of
+ # operation for /etc/resolv.conf.
+
+ nameserver 127.0.0.1
+ search search.domain
+ """,
+ List.of(new InetSocketAddress("127.0.0.1", 53)),
+ List.of("search.domain")),
+ arguments(
+ """
+ nameserver 127.0.0.1
+ nameserver 1.2.3.4
+ """,
+ List.of(new InetSocketAddress("127.0.0.1", 53), new
InetSocketAddress("1.2.3.4", 53)),
+ List.of()),
+ arguments(
+ """
+ nameserver 127.0.0.1
+ nameserver 1.2.3.4
+ search search.domain
+ search anothersearch.anotherdomain
+ """,
+ List.of(new InetSocketAddress("127.0.0.1", 53), new
InetSocketAddress("1.2.3.4", 53)),
+ List.of("search.domain", "anothersearch.anotherdomain")),
+ arguments(
+ """
+ nameserver 127.0.0.1
+ nameserver 1.2.3.4
+ search search.domain anothersearch.anotherdomain
+ """,
+ List.of(new InetSocketAddress("127.0.0.1", 53), new
InetSocketAddress("1.2.3.4", 53)),
+ List.of("search.domain", "anothersearch.anotherdomain")),
+ arguments("", List.of(), List.of()));
+ }
+}
diff --git
a/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/resources/logback-test.xml
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..3cba413f2
--- /dev/null
+++
b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<configuration debug="false">
+ <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator"/>
+ <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%date{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+ <root level="INFO">
+ <appender-ref ref="console"/>
+ </root>
+ <logger name="org.jboss" level="INFO"/>
+ <logger name="org.apache.polaris" level="DEBUG"/>
+ <logger name="org.apache.polaris.persistence.nosql.metastore" level="TRACE"/>
+ <logger name="org.apache.polaris.persistence.nosql.quarkus.distcache"
level="TRACE"/>
+</configuration>