This is an automated email from the ASF dual-hosted git repository.
jyothsnakonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new deace73e CASSSIDECAR-452: Avoid blocking the event loop in
CdcPublisher event-bus handler (#345)
deace73e is described below
commit deace73e189f3eae6fd10f2a16b15bd92d2b2865
Author: Klose6 <[email protected]>
AuthorDate: Thu May 28 11:54:44 2026 -0700
CASSSIDECAR-452: Avoid blocking the event loop in CdcPublisher event-bus
handler (#345)
patch by Yafeng Lu; reviewed by Jyothsna Konisa, Shailaja Koppu, Yifan Cai
for CASSSIDECAR-452
---
CHANGES.txt | 1 +
.../apache/cassandra/sidecar/cdc/CdcPublisher.java | 45 +++++---
.../cassandra/sidecar/cdc/CdcPublisherTests.java | 116 +++++++++++++++++++++
3 files changed, 148 insertions(+), 14 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index c402d446..2d68339f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.4.0
-----
+ * Avoid blocking the event loop in CdcPublisher event-bus handler
(CASSSIDECAR-452)
* Added validations for Live migration configuration (CASSSIDECAR-470)
* Route CDC events to topics by corresponding topic format configuration
(CASSSIDECAR-453)
* Add Docker Compose setup for local CDC demo (Cassandra → Sidecar → Kafka)
(CASSSIDECAR-419)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java
index d7db17bc..2fc6ac20 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java
@@ -186,13 +186,21 @@ public class CdcPublisher implements
Handler<Message<Object>>, PeriodicTask
{
sidecarCdcStats.captureCdcConfigChange();
// Execute restart on worker thread to avoid blocking event loop
- executorPools.executeBlocking(() -> {
- restart();
- return null;
- });
+ executorPools.runBlocking(
+ CdcPublisher.this::restart
+ ).onFailure(t ->
handleAsyncFailure(ON_CDC_CONFIGURATION_CHANGED.address(), t));
}
}
+ /**
+ * Handle the unexpected error while processing event on worker pool.
+ */
+ private void handleAsyncFailure(String address, Throwable t)
+ {
+ LOGGER.error("Unexpected error while processing event '{}' on worker
pool", address, t);
+ sidecarCdcStats.captureUnrecoverableCdcError(t);
+ }
+
@SuppressWarnings("resource")
private synchronized void run() throws IllegalStateException
{
@@ -255,7 +263,7 @@ public class CdcPublisher implements
Handler<Message<Object>>, PeriodicTask
return isRunning;
}
- private synchronized void stop()
+ synchronized void stop()
{
if (!isRunning)
{
@@ -335,26 +343,35 @@ public class CdcPublisher implements
Handler<Message<Object>>, PeriodicTask
// EventBus handlers
@Override
- public synchronized void handle(Message<Object> msg)
+ public void handle(Message<Object> msg)
{
- if
(msg.address().equals(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address()))
+ String address = msg.address();
+ if
(address.equals(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address()))
{
- handleTokenRangeChange();
+
executorPools.runBlocking(CdcPublisher.this::handleTokenRangeChange)
+ .onFailure(t -> handleAsyncFailure(address, t));
}
- else if
(msg.address().equals(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address()))
+ else if
(address.equals(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address()))
{
- handleRangeGained((RangeManager.RangeChangeEvent) msg.body());
+ RangeManager.RangeChangeEvent event =
(RangeManager.RangeChangeEvent) msg.body();
+ executorPools.runBlocking(() -> handleRangeGained(event))
+ .onFailure(t -> handleAsyncFailure(address, t));
}
- else if
(msg.address().equals(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address()))
+ else if
(address.equals(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address()))
{
- handleRangeLost((RangeManager.RangeChangeEvent) msg.body());
+ RangeManager.RangeChangeEvent event =
(RangeManager.RangeChangeEvent) msg.body();
+ executorPools.runBlocking(() -> handleRangeLost(event))
+ .onFailure(t -> handleAsyncFailure(address, t));
}
- else if (msg.address().equals(ON_SERVER_STOP.address()))
+ else if (address.equals(ON_SERVER_STOP.address()))
{
+ // Run stop() on the event loop so it completes before the loop
closes during shutdown;
+ // stopConsumers() only signals the SidecarCdc iterators (not
heavy work).
stop();
}
- else if (msg.address().equals(ON_CDC_CACHE_WARMED_UP.address()))
+ else if (address.equals(ON_CDC_CACHE_WARMED_UP.address()))
{
+ // Single volatile write - safe on the event loop
cdcCacheWarmedUp = true;
}
}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java
index 045b2f9f..94d20037 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java
@@ -29,7 +29,9 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import com.google.inject.Provider;
+import io.vertx.core.Future;
import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.cdc.api.CdcOptions;
@@ -42,6 +44,7 @@ import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
import org.apache.cassandra.cdc.stats.ICdcStats;
import org.apache.cassandra.sidecar.bridge.CassandraBridgeFactory;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.ThrowingRunnable;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
import org.apache.cassandra.sidecar.coordination.RangeManager;
@@ -49,11 +52,14 @@ import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
import org.apache.cassandra.sidecar.db.CdcSystemViewsDatabaseAccessor;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
@@ -61,6 +67,8 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@@ -113,6 +121,8 @@ public class CdcPublisherTests
// Mock ExecutorPools behavior
when(executorPools.internal()).thenReturn(taskExecutorPool);
+ // Return a real successfully-completed Future so .onFailure(...)
chaining in handle() is a safe no-op.
+
when(taskExecutorPool.runBlocking(any(ThrowingRunnable.class))).thenReturn(Future.succeededFuture(null));
// Mock Vertx EventBus for event listeners
when(vertx.eventBus()).thenReturn(mock(io.vertx.core.eventbus.EventBus.class,
RETURNS_DEEP_STUBS));
@@ -203,4 +213,110 @@ public class CdcPublisherTests
when(cassandraBridgeFactory.get(anyString())).thenReturn(mockBridge);
when(kafkaProducerFactory.create(any())).thenReturn(mock(KafkaProducer.class));
}
+
+
+ @Test
+ void testHandleTokenRangeChangedDispatchesToWorkerPool() throws Exception
+ {
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address());
+
+ CdcPublisher spyPublisher = spy(cdcPublisher);
+ spyPublisher.handle(msg);
+
+ ArgumentCaptor<ThrowingRunnable> captor =
ArgumentCaptor.forClass(ThrowingRunnable.class);
+ verify(taskExecutorPool).runBlocking(captor.capture());
+
+ captor.getValue().run();
+ verify(spyPublisher).handleTokenRangeChange();
+ }
+
+ @Test
+ void testHandleTokenRangeGainedPassesEventToHandler() throws Exception
+ {
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address());
+ RangeManager.RangeChangeEvent event =
mock(RangeManager.RangeChangeEvent.class);
+ when(msg.body()).thenReturn(event);
+
+ CdcPublisher spyPublisher = spy(cdcPublisher);
+ spyPublisher.handle(msg);
+
+ ArgumentCaptor<ThrowingRunnable> captor =
ArgumentCaptor.forClass(ThrowingRunnable.class);
+ verify(taskExecutorPool).runBlocking(captor.capture());
+
+ captor.getValue().run();
+ verify(spyPublisher).handleRangeGained(event);
+ }
+
+ @Test
+ void testHandleTokenRangeLostPassesEventToHandler() throws Exception
+ {
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address());
+ RangeManager.RangeChangeEvent event =
mock(RangeManager.RangeChangeEvent.class);
+ when(msg.body()).thenReturn(event);
+
+ CdcPublisher spyPublisher = spy(cdcPublisher);
+ spyPublisher.handle(msg);
+
+ ArgumentCaptor<ThrowingRunnable> captor =
ArgumentCaptor.forClass(ThrowingRunnable.class);
+ verify(taskExecutorPool).runBlocking(captor.capture());
+
+ captor.getValue().run();
+ verify(spyPublisher).handleRangeLost(event);
+ }
+
+ @Test
+ void testHandleServerStopRunsStopOnEventLoopWithoutWorkerPool()
+ {
+ Message<Object> msg = mock(Message.class);
+ when(msg.address()).thenReturn(ON_SERVER_STOP.address());
+
+ cdcPublisher = spy(cdcPublisher);
+ cdcPublisher.handle(msg);
+
+ // stop() must run synchronously on the event loop so shutdown
completes
+ // before the loop closes; no worker-pool dispatch should occur.
+ verify(cdcPublisher).stop();
+ verify(taskExecutorPool,
never()).runBlocking(any(ThrowingRunnable.class));
+ }
+
+ @Test
+ void testHandleCdcCacheWarmedUpRunsOnEventLoopWithoutWorkerPool()
+ {
+ Message<Object> msg = mock(Message.class);
+ when(msg.address()).thenReturn(ON_CDC_CACHE_WARMED_UP.address());
+
+ cdcPublisher.handle(msg);
+
+ verify(taskExecutorPool,
never()).runBlocking(any(ThrowingRunnable.class));
+ }
+
+ @Test
+ void testHandleUnknownAddressIsNoOp()
+ {
+ Message<Object> msg = mock(Message.class);
+ when(msg.address()).thenReturn("unknown.address");
+
+ cdcPublisher.handle(msg);
+
+ verify(taskExecutorPool,
never()).runBlocking(any(ThrowingRunnable.class));
+ }
+
+ @Test
+ void testHandleWorkerPoolFailureIsLoggedAndCaptured()
+ {
+ RuntimeException boom = new RuntimeException("boom");
+
when(taskExecutorPool.runBlocking(any(ThrowingRunnable.class))).thenReturn(Future.failedFuture(boom));
+
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address());
+
+ cdcPublisher.handle(msg);
+
+ // The discarded Future's failure path must reach our backstop:
+ // captureUnrecoverableCdcError so the failure is observable in
metrics.
+ verify(sidecarCdcStats).captureUnrecoverableCdcError(boom);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]