This is an automated email from the ASF dual-hosted git repository.

jyothsnakonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new deace73e CASSSIDECAR-452: Avoid blocking the event loop in 
CdcPublisher event-bus handler (#345)
deace73e is described below

commit deace73e189f3eae6fd10f2a16b15bd92d2b2865
Author: Klose6 <[email protected]>
AuthorDate: Thu May 28 11:54:44 2026 -0700

    CASSSIDECAR-452: Avoid blocking the event loop in CdcPublisher event-bus 
handler (#345)
    
    patch by Yafeng Lu; reviewed by Jyothsna Konisa, Shailaja Koppu, Yifan Cai 
for CASSSIDECAR-452
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/sidecar/cdc/CdcPublisher.java |  45 +++++---
 .../cassandra/sidecar/cdc/CdcPublisherTests.java   | 116 +++++++++++++++++++++
 3 files changed, 148 insertions(+), 14 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c402d446..2d68339f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.4.0
 -----
+ * Avoid blocking the event loop in CdcPublisher event-bus handler 
(CASSSIDECAR-452)
  * Added validations for Live migration configuration (CASSSIDECAR-470)
  * Route CDC events to topics by corresponding topic format configuration 
(CASSSIDECAR-453)
  * Add Docker Compose setup for local CDC demo (Cassandra → Sidecar → Kafka) 
(CASSSIDECAR-419)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java
index d7db17bc..2fc6ac20 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java
@@ -186,13 +186,21 @@ public class CdcPublisher implements 
Handler<Message<Object>>, PeriodicTask
         {
             sidecarCdcStats.captureCdcConfigChange();
             // Execute restart on worker thread to avoid blocking event loop
-            executorPools.executeBlocking(() -> {
-                restart();
-                return null;
-            });
+            executorPools.runBlocking(
+                CdcPublisher.this::restart
+            ).onFailure(t -> 
handleAsyncFailure(ON_CDC_CONFIGURATION_CHANGED.address(), t));
         }
     }
 
+    /**
+     * Handle the unexpected error while processing event on worker pool.
+     */
+    private void handleAsyncFailure(String address, Throwable t)
+    {
+        LOGGER.error("Unexpected error while processing event '{}' on worker 
pool", address, t);
+        sidecarCdcStats.captureUnrecoverableCdcError(t);
+    }
+
     @SuppressWarnings("resource")
     private synchronized void run() throws IllegalStateException
     {
@@ -255,7 +263,7 @@ public class CdcPublisher implements 
Handler<Message<Object>>, PeriodicTask
         return isRunning;
     }
 
-    private synchronized void stop()
+    synchronized void stop()
     {
         if (!isRunning)
         {
@@ -335,26 +343,35 @@ public class CdcPublisher implements 
Handler<Message<Object>>, PeriodicTask
 
     // EventBus handlers
     @Override
-    public synchronized void handle(Message<Object> msg)
+    public void handle(Message<Object> msg)
     {
-        if 
(msg.address().equals(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address()))
+        String address = msg.address();
+        if 
(address.equals(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address()))
         {
-            handleTokenRangeChange();
+            
executorPools.runBlocking(CdcPublisher.this::handleTokenRangeChange)
+                         .onFailure(t -> handleAsyncFailure(address, t));
         }
-        else if 
(msg.address().equals(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address()))
+        else if 
(address.equals(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address()))
         {
-            handleRangeGained((RangeManager.RangeChangeEvent) msg.body());
+            RangeManager.RangeChangeEvent event = 
(RangeManager.RangeChangeEvent) msg.body();
+            executorPools.runBlocking(() -> handleRangeGained(event))
+                         .onFailure(t -> handleAsyncFailure(address, t));
         }
-        else if 
(msg.address().equals(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address()))
+        else if 
(address.equals(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address()))
         {
-            handleRangeLost((RangeManager.RangeChangeEvent) msg.body());
+            RangeManager.RangeChangeEvent event = 
(RangeManager.RangeChangeEvent) msg.body();
+            executorPools.runBlocking(() -> handleRangeLost(event))
+                         .onFailure(t -> handleAsyncFailure(address, t));
         }
-        else if (msg.address().equals(ON_SERVER_STOP.address()))
+        else if (address.equals(ON_SERVER_STOP.address()))
         {
+            // Run stop() on the event loop so it completes before the loop 
closes during shutdown;
+            // stopConsumers() only signals the SidecarCdc iterators (not 
heavy work).
             stop();
         }
-        else if (msg.address().equals(ON_CDC_CACHE_WARMED_UP.address()))
+        else if (address.equals(ON_CDC_CACHE_WARMED_UP.address()))
         {
+            // Single volatile write - safe on the event loop
             cdcCacheWarmedUp = true;
         }
     }
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java 
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java
index 045b2f9f..94d20037 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java
@@ -29,7 +29,9 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
 import com.google.inject.Provider;
+import io.vertx.core.Future;
 import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
 import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.CassandraVersion;
 import org.apache.cassandra.cdc.api.CdcOptions;
@@ -42,6 +44,7 @@ import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
 import org.apache.cassandra.cdc.stats.ICdcStats;
 import org.apache.cassandra.sidecar.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.ThrowingRunnable;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
 import org.apache.cassandra.sidecar.coordination.RangeManager;
@@ -49,11 +52,14 @@ import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
 import org.apache.cassandra.sidecar.db.CdcSystemViewsDatabaseAccessor;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -61,6 +67,8 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -113,6 +121,8 @@ public class CdcPublisherTests
 
         // Mock ExecutorPools behavior
         when(executorPools.internal()).thenReturn(taskExecutorPool);
+        // Return a real successfully-completed Future so .onFailure(...) 
chaining in handle() is a safe no-op.
+        
when(taskExecutorPool.runBlocking(any(ThrowingRunnable.class))).thenReturn(Future.succeededFuture(null));
 
         // Mock Vertx EventBus for event listeners
         
when(vertx.eventBus()).thenReturn(mock(io.vertx.core.eventbus.EventBus.class, 
RETURNS_DEEP_STUBS));
@@ -203,4 +213,110 @@ public class CdcPublisherTests
         when(cassandraBridgeFactory.get(anyString())).thenReturn(mockBridge);
         
when(kafkaProducerFactory.create(any())).thenReturn(mock(KafkaProducer.class));
     }
+
+
+    @Test
+    void testHandleTokenRangeChangedDispatchesToWorkerPool() throws Exception
+    {
+        Message<Object> msg = mock(Message.class);
+        
when(msg.address()).thenReturn(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address());
+
+        CdcPublisher spyPublisher = spy(cdcPublisher);
+        spyPublisher.handle(msg);
+
+        ArgumentCaptor<ThrowingRunnable> captor = 
ArgumentCaptor.forClass(ThrowingRunnable.class);
+        verify(taskExecutorPool).runBlocking(captor.capture());
+
+        captor.getValue().run();
+        verify(spyPublisher).handleTokenRangeChange();
+    }
+
+    @Test
+    void testHandleTokenRangeGainedPassesEventToHandler() throws Exception
+    {
+        Message<Object> msg = mock(Message.class);
+        
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address());
+        RangeManager.RangeChangeEvent event = 
mock(RangeManager.RangeChangeEvent.class);
+        when(msg.body()).thenReturn(event);
+
+        CdcPublisher spyPublisher = spy(cdcPublisher);
+        spyPublisher.handle(msg);
+
+        ArgumentCaptor<ThrowingRunnable> captor = 
ArgumentCaptor.forClass(ThrowingRunnable.class);
+        verify(taskExecutorPool).runBlocking(captor.capture());
+
+        captor.getValue().run();
+        verify(spyPublisher).handleRangeGained(event);
+    }
+
+    @Test
+    void testHandleTokenRangeLostPassesEventToHandler() throws Exception
+    {
+        Message<Object> msg = mock(Message.class);
+        
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address());
+        RangeManager.RangeChangeEvent event = 
mock(RangeManager.RangeChangeEvent.class);
+        when(msg.body()).thenReturn(event);
+
+        CdcPublisher spyPublisher = spy(cdcPublisher);
+        spyPublisher.handle(msg);
+
+        ArgumentCaptor<ThrowingRunnable> captor = 
ArgumentCaptor.forClass(ThrowingRunnable.class);
+        verify(taskExecutorPool).runBlocking(captor.capture());
+
+        captor.getValue().run();
+        verify(spyPublisher).handleRangeLost(event);
+    }
+
+    @Test
+    void testHandleServerStopRunsStopOnEventLoopWithoutWorkerPool()
+    {
+        Message<Object> msg = mock(Message.class);
+        when(msg.address()).thenReturn(ON_SERVER_STOP.address());
+
+        cdcPublisher = spy(cdcPublisher);
+        cdcPublisher.handle(msg);
+
+        // stop() must run synchronously on the event loop so shutdown 
completes
+        // before the loop closes; no worker-pool dispatch should occur.
+        verify(cdcPublisher).stop();
+        verify(taskExecutorPool, 
never()).runBlocking(any(ThrowingRunnable.class));
+    }
+
+    @Test
+    void testHandleCdcCacheWarmedUpRunsOnEventLoopWithoutWorkerPool()
+    {
+        Message<Object> msg = mock(Message.class);
+        when(msg.address()).thenReturn(ON_CDC_CACHE_WARMED_UP.address());
+
+        cdcPublisher.handle(msg);
+
+        verify(taskExecutorPool, 
never()).runBlocking(any(ThrowingRunnable.class));
+    }
+
+    @Test
+    void testHandleUnknownAddressIsNoOp()
+    {
+        Message<Object> msg = mock(Message.class);
+        when(msg.address()).thenReturn("unknown.address");
+
+        cdcPublisher.handle(msg);
+
+        verify(taskExecutorPool, 
never()).runBlocking(any(ThrowingRunnable.class));
+    }
+
+    @Test
+    void testHandleWorkerPoolFailureIsLoggedAndCaptured()
+    {
+        RuntimeException boom = new RuntimeException("boom");
+        
when(taskExecutorPool.runBlocking(any(ThrowingRunnable.class))).thenReturn(Future.failedFuture(boom));
+
+        Message<Object> msg = mock(Message.class);
+        
when(msg.address()).thenReturn(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address());
+
+        cdcPublisher.handle(msg);
+
+        // The discarded Future's failure path must reach our backstop:
+        // captureUnrecoverableCdcError so the failure is observable in 
metrics.
+        verify(sidecarCdcStats).captureUnrecoverableCdcError(boom);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to