This is an automated email from the ASF dual-hosted git repository.
adutra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 4e2facbb4 Introduce alternate in-memory buffering event listener
(#2574)
4e2facbb4 is described below
commit 4e2facbb4b743b4d54c1d39aaf6fc290960eca7e
Author: Alexandre Dutra <[email protected]>
AuthorDate: Fri Sep 19 11:05:01 2025 +0200
Introduce alternate in-memory buffering event listener (#2574)
---
runtime/service/build.gradle.kts | 11 +-
...emoryBufferPolarisPersistenceEventListener.java | 6 +-
.../listeners/PolarisPersistenceEventListener.java | 8 +-
.../listeners/inmemory/InMemoryEventListener.java | 141 +++++++++++++++++++++
.../InMemoryEventListenerBufferSizeTest.java | 100 +++++++++++++++
.../InMemoryEventListenerBufferTimeTest.java | 54 ++++++++
.../inmemory/InMemoryEventListenerTestBase.java | 117 +++++++++++++++++
7 files changed, 425 insertions(+), 12 deletions(-)
diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts
index 0fc787e0d..095a68468 100644
--- a/runtime/service/build.gradle.kts
+++ b/runtime/service/build.gradle.kts
@@ -129,22 +129,23 @@ dependencies {
testImplementation("io.quarkus:quarkus-junit5-mockito")
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("io.quarkus:quarkus-rest-client-jackson")
+ testImplementation("io.quarkus:quarkus-jdbc-h2")
+
testImplementation("io.rest-assured:rest-assured")
+
testImplementation(libs.localstack)
- testImplementation("org.testcontainers:testcontainers")
+
+ testImplementation(project(":polaris-runtime-test-common"))
testImplementation(project(":polaris-container-spec-helper"))
testImplementation(libs.threeten.extra)
testImplementation(libs.hawkular.agent.prometheus.scraper)
- testImplementation(project(":polaris-runtime-test-common"))
-
- testImplementation("io.quarkus:quarkus-junit5")
testImplementation(libs.awaitility)
+
testImplementation(platform(libs.testcontainers.bom))
testImplementation("org.testcontainers:testcontainers")
testImplementation("org.testcontainers:postgresql")
- testImplementation("org.postgresql:postgresql")
testFixturesImplementation(project(":polaris-core"))
testFixturesImplementation(project(":polaris-api-management-model"))
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java
index 8dc425ad0..481c599bc 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java
@@ -127,7 +127,7 @@ public class InMemoryBufferPolarisPersistenceEventListener
extends PolarisPersis
@Nullable
@Override
- String getRequestId() {
+ protected String getRequestId() {
if (containerRequestContext != null &&
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
}
@@ -135,7 +135,7 @@ public class InMemoryBufferPolarisPersistenceEventListener
extends PolarisPersis
}
@Override
- void processEvent(PolarisEvent polarisEvent) {
+ protected void processEvent(PolarisEvent polarisEvent) {
String realmId = callContext.getRealmContext().getRealmIdentifier();
ConcurrentLinkedQueueWithApproximateSize<PolarisEvent> realmQueue =
@@ -192,7 +192,7 @@ public class InMemoryBufferPolarisPersistenceEventListener
extends PolarisPersis
}
@Override
- ContextSpecificInformation getContextSpecificInformation() {
+ protected ContextSpecificInformation getContextSpecificInformation() {
return new ContextSpecificInformation(
clock.millis(),
securityContext.getUserPrincipal() == null
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java
index e9d43f003..11771797a 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java
@@ -90,12 +90,12 @@ public abstract class PolarisPersistenceEventListener
extends PolarisEventListen
processEvent(polarisEvent);
}
- protected record ContextSpecificInformation(long timestamp, @Nullable String
principalName) {}
+ public record ContextSpecificInformation(long timestamp, @Nullable String
principalName) {}
- abstract ContextSpecificInformation getContextSpecificInformation();
+ protected abstract ContextSpecificInformation
getContextSpecificInformation();
@Nullable
- abstract String getRequestId();
+ protected abstract String getRequestId();
- abstract void processEvent(PolarisEvent event);
+ protected abstract void processEvent(PolarisEvent event);
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListener.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListener.java
new file mode 100644
index 000000000..8af66b194
--- /dev/null
+++
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListener.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners.inmemory;
+
+import static
org.apache.polaris.service.logging.LoggingMDCFilter.REQUEST_ID_KEY;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import io.smallrye.mutiny.infrastructure.Infrastructure;
+import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
+import jakarta.annotation.Nullable;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import
org.apache.polaris.service.events.listeners.InMemoryBufferEventListenerConfiguration;
+import
org.apache.polaris.service.events.listeners.PolarisPersistenceEventListener;
+import org.eclipse.microprofile.faulttolerance.Fallback;
+import org.eclipse.microprofile.faulttolerance.Retry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ApplicationScoped
+@Identifier("persistence-in-memory")
+public class InMemoryEventListener extends PolarisPersistenceEventListener {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InMemoryEventListener.class);
+
+ @Inject CallContext callContext;
+ @Inject Clock clock;
+ @Inject MetaStoreManagerFactory metaStoreManagerFactory;
+ @Inject InMemoryBufferEventListenerConfiguration configuration;
+
+ @Context SecurityContext securityContext;
+ @Context ContainerRequestContext requestContext;
+
+ @VisibleForTesting
+ final LoadingCache<String, UnicastProcessor<PolarisEvent>> processors =
+ Caffeine.newBuilder()
+ .expireAfterAccess(Duration.ofHours(1))
+ .evictionListener(
+ (String realmId, UnicastProcessor<?> processor, RemovalCause
cause) ->
+ processor.onComplete())
+ .build(this::createProcessor);
+
+ @Override
+ protected void processEvent(PolarisEvent event) {
+ var realmId = callContext.getRealmContext().getRealmIdentifier();
+ processEvent(realmId, event);
+ }
+
+ protected void processEvent(String realmId, PolarisEvent event) {
+ var processor = Objects.requireNonNull(processors.get(realmId));
+ processor.onNext(event);
+ }
+
+ @Override
+ protected ContextSpecificInformation getContextSpecificInformation() {
+ var principal = securityContext.getUserPrincipal();
+ var principalName = principal == null ? null : principal.getName();
+ return new ContextSpecificInformation(clock.millis(), principalName);
+ }
+
+ @Nullable
+ @Override
+ protected String getRequestId() {
+ return (String) requestContext.getProperty(REQUEST_ID_KEY);
+ }
+
+ @PreDestroy
+ public void shutdown() {
+ processors.asMap().values().forEach(UnicastProcessor::onComplete);
+ processors.invalidateAll(); // doesn't call the eviction listener
+ }
+
+ protected UnicastProcessor<PolarisEvent> createProcessor(String realmId) {
+ UnicastProcessor<PolarisEvent> processor = UnicastProcessor.create();
+ processor
+ .emitOn(Infrastructure.getDefaultWorkerPool())
+ .group()
+ .intoLists()
+ .of(configuration.maxBufferSize(), configuration.bufferTime())
+ .subscribe()
+ .with(events -> flush(realmId, events), error ->
onProcessorError(realmId, error));
+ return processor;
+ }
+
+ @Retry(maxRetries = 5, delay = 1000, jitter = 100)
+ @Fallback(fallbackMethod = "onFlushError")
+ protected void flush(String realmId, List<PolarisEvent> events) {
+ RealmContext realmContext = () -> realmId;
+ var metaStoreManager =
metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
+ var basePersistence =
metaStoreManagerFactory.getOrCreateSession(realmContext);
+ var callContext = new PolarisCallContext(realmContext, basePersistence);
+ metaStoreManager.writeEvents(callContext, events);
+ }
+
+ @SuppressWarnings("unused")
+ protected void onFlushError(String realmId, List<PolarisEvent> events,
Throwable error) {
+ LOGGER.error("Failed to persist {} events for realm '{}'", events.size(),
realmId, error);
+ }
+
+ protected void onProcessorError(String realmId, Throwable error) {
+ LOGGER.error(
+ "Unexpected error while processing events for realm '{}'; some events
may have been dropped",
+ realmId,
+ error);
+ processors.invalidate(realmId);
+ }
+}
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferSizeTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferSizeTest.java
new file mode 100644
index 000000000..1cb76cdcf
--- /dev/null
+++
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferSizeTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners.inmemory;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+
+import com.google.common.collect.ImmutableMap;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.QuarkusTestProfile;
+import io.quarkus.test.junit.TestProfile;
+import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
+import io.smallrye.mutiny.subscription.BackPressureFailure;
+import java.util.Map;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.mockito.Mockito;
+
+@QuarkusTest
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@TestProfile(InMemoryEventListenerBufferSizeTest.Profile.class)
+class InMemoryEventListenerBufferSizeTest extends
InMemoryEventListenerTestBase {
+
+ public static class Profile implements QuarkusTestProfile {
+
+ @Override
+ public Map<String, String> getConfigOverrides() {
+ return ImmutableMap.<String, String>builder()
+ .putAll(BASE_CONFIG)
+
.put("polaris.event-listener.persistence-in-memory-buffer.buffer-time", "60s")
+
.put("polaris.event-listener.persistence-in-memory-buffer.max-buffer-size",
"10")
+ .build();
+ }
+ }
+
+ @Test
+ void testFlushOnSize() {
+ sendAsync("test1", 10);
+ sendAsync("test2", 10);
+ assertRows("test1", 10);
+ assertRows("test2", 10);
+ }
+
+ @Test
+ void testFlushOnShutdown() {
+ producer.processEvent("test1", event());
+ producer.processEvent("test2", event());
+ producer.shutdown();
+ assertRows("test1", 1);
+ assertRows("test2", 1);
+ }
+
+ @Test
+ void testFlushFailureRecovery() {
+ var manager = Mockito.mock(PolarisMetaStoreManager.class);
+
doReturn(manager).when(metaStoreManagerFactory).getOrCreateMetaStoreManager(any());
+ RuntimeException error = new RuntimeException("error");
+ doThrow(error)
+ .doThrow(error) // first batch will give up after 2 attempts
+ .doThrow(error)
+ .doCallRealMethod() // second batch will succeed on the 2nd attempt
+ .when(manager)
+ .writeEvents(any(), any());
+ sendAsync("test1", 20);
+ assertRows("test1", 10);
+ }
+
+ @Test
+ void testProcessorFailureRecovery() {
+ producer.processEvent("test1", event());
+ UnicastProcessor<PolarisEvent> test1 = producer.processors.get("test1");
+ assertThat(test1).isNotNull();
+ // emulate backpressure error; will drop the event and invalidate the
processor
+ test1.onError(new BackPressureFailure("error"));
+ // will create a new processor and recover
+ sendAsync("test1", 10);
+ assertRows("test1", 10);
+ }
+}
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferTimeTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferTimeTest.java
new file mode 100644
index 000000000..8431274ea
--- /dev/null
+++
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferTimeTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners.inmemory;
+
+import com.google.common.collect.ImmutableMap;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.QuarkusTestProfile;
+import io.quarkus.test.junit.TestProfile;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@QuarkusTest
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@TestProfile(InMemoryEventListenerBufferTimeTest.Profile.class)
+class InMemoryEventListenerBufferTimeTest extends
InMemoryEventListenerTestBase {
+
+ public static class Profile implements QuarkusTestProfile {
+
+ @Override
+ public Map<String, String> getConfigOverrides() {
+ return ImmutableMap.<String, String>builder()
+ .putAll(BASE_CONFIG)
+
.put("polaris.event-listener.persistence-in-memory-buffer.buffer-time", "100ms")
+
.put("polaris.event-listener.persistence-in-memory-buffer.max-buffer-size",
"1000")
+ .build();
+ }
+ }
+
+ @Test
+ void testFlushOnTimeout() {
+ sendAsync("test1", 5);
+ sendAsync("test2", 1);
+ assertRows("test1", 5);
+ assertRows("test2", 1);
+ }
+}
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerTestBase.java
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerTestBase.java
new file mode 100644
index 000000000..527491b56
--- /dev/null
+++
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerTestBase.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners.inmemory;
+
+import static org.apache.polaris.core.entity.PolarisEvent.ResourceType.CATALOG;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.reset;
+
+import com.google.common.collect.ImmutableMap;
+import io.netty.channel.EventLoopGroup;
+import io.quarkus.netty.MainEventLoopGroup;
+import io.quarkus.test.junit.mockito.InjectSpy;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.inject.Instance;
+import jakarta.inject.Inject;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Map;
+import java.util.UUID;
+import javax.sql.DataSource;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.junit.jupiter.api.AfterEach;
+
+abstract class InMemoryEventListenerTestBase {
+
+ static final Map<String, String> BASE_CONFIG =
+ ImmutableMap.<String, String>builder()
+ .put("polaris.realm-context.realms", "test1,test2")
+ .put("polaris.persistence.type", "relational-jdbc")
+ .put("polaris.persistence.auto-bootstrap-types", "relational-jdbc")
+ .put("quarkus.datasource.db-kind", "h2")
+ .put(
+ "quarkus.datasource.jdbc.url",
+
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE")
+ .put("polaris.event-listener.type", "persistence-in-memory")
+ .put(
+
"quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryEventListener/flush\".retry.max-retries",
+ "1")
+ .put(
+
"quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryEventListener/flush\".retry.delay",
+ "10")
+ .build();
+
+ @Inject
+ @Identifier("persistence-in-memory")
+ InMemoryEventListener producer;
+
+ @InjectSpy
+ @Identifier("relational-jdbc")
+ @SuppressWarnings("CdiInjectionPointsInspection")
+ MetaStoreManagerFactory metaStoreManagerFactory;
+
+ @Inject
+ @MainEventLoopGroup
+ @SuppressWarnings("CdiInjectionPointsInspection")
+ EventLoopGroup eventLoopGroup;
+
+ @Inject Instance<DataSource> dataSource;
+
+ @AfterEach
+ void clearEvents() throws Exception {
+ reset(metaStoreManagerFactory);
+ producer.shutdown();
+ try (Connection connection = dataSource.get().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("DELETE FROM polaris_schema.events");
+ }
+ }
+
+ void sendAsync(String realmId, int n) {
+ for (int i = 0; i < n; i++) {
+ eventLoopGroup.next().execute(() -> producer.processEvent(realmId,
event()));
+ }
+ }
+
+ @SuppressWarnings("SqlSourceToSinkFlow")
+ void assertRows(String realmId, int expected) {
+ String query = "SELECT COUNT(*) FROM polaris_schema.events WHERE realm_id
= '" + realmId + "'";
+ await()
+ .atMost(Duration.ofSeconds(10))
+ .untilAsserted(
+ () -> {
+ try (Connection connection = dataSource.get().getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet rs = statement.executeQuery(query)) {
+ rs.next();
+ assertThat(rs.getInt(1)).isEqualTo(expected);
+ }
+ });
+ }
+
+ static PolarisEvent event() {
+ String id = UUID.randomUUID().toString();
+ return new PolarisEvent("test", id, null, "test", 0, null, CATALOG,
"test");
+ }
+}