This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c1260886 CASSSIDECAR-172: Adding support for CDC APIs into sidecar 
client (#158)
c1260886 is described below

commit c1260886642e789fa15a3fb900251bea529fcc09
Author: Jyothsna konisa <[email protected]>
AuthorDate: Sat Dec 14 12:39:41 2024 -0800

    CASSSIDECAR-172: Adding support for CDC APIs into sidecar client (#158)
    
    Patch by Jyothsna Konisa; Reviewed by Bernardo Botella, Francisco Guerrero, 
James Berragan, Yifan Cai for CASSSIDECAR-172
---
 CHANGES.txt                                        |  1 +
 .../common/request/ListCdcSegmentsRequest.java     | 40 ++++++++++++
 .../common/request/StreamCdcSegmentRequest.java    | 54 ++++++++++++++++
 .../common/response/ListCdcSegmentsResponse.java   | 22 +++++++
 .../common/response/data/CdcSegmentInfo.java       | 24 +++++++
 .../cassandra/sidecar/client/SidecarClient.java    | 38 +++++++++++
 .../sidecar/client/SidecarClientTest.java          | 74 ++++++++++++++++++++++
 7 files changed, 253 insertions(+)

diff --git a/CHANGES.txt b/CHANGES.txt
index 70cabce9..d78e60ac 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -117,3 +117,4 @@
  * Add support for SSL and bindable address (CASSANDRA-15030)
  * Autogenerate API docs for sidecar (CASSANDRA-15028)
  * C* Management process (CASSANDRA-14395)
+ * Adding support for CDC APIs into sidecar client (CASSSIDECAR-172)
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ListCdcSegmentsRequest.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ListCdcSegmentsRequest.java
new file mode 100644
index 00000000..90331d98
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ListCdcSegmentsRequest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
+
+/**
+ * Represents a request for listing commit log files on an instance
+ */
+public class ListCdcSegmentsRequest extends 
JsonRequest<ListCdcSegmentsResponse>
+{
+    public ListCdcSegmentsRequest()
+    {
+        super(ApiEndpointsV1.LIST_CDC_SEGMENTS_ROUTE);
+    }
+
+    @Override
+    public HttpMethod method()
+    {
+        return HttpMethod.GET;
+    }
+}
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/StreamCdcSegmentRequest.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/StreamCdcSegmentRequest.java
new file mode 100644
index 00000000..fe1ac54b
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/StreamCdcSegmentRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.utils.HttpRange;
+
+/**
+ * Represents a request to stream CDC segments(commit logs) on an instance.
+ */
+public class StreamCdcSegmentRequest extends Request
+{
+    private final HttpRange range;
+
+    public StreamCdcSegmentRequest(String segment, HttpRange range)
+    {
+        super(requestURI(segment));
+        this.range = range;
+    }
+
+    @Override
+    public HttpMethod method()
+    {
+        return HttpMethod.GET;
+    }
+
+    @Override
+    protected HttpRange range()
+    {
+        return range;
+    }
+
+    private static String requestURI(String segment)
+    {
+        return 
ApiEndpointsV1.STREAM_CDC_SEGMENTS_ROUTE.replaceAll(ApiEndpointsV1.SEGMENT_PATH_PARAM,
 segment);
+    }
+}
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponse.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponse.java
index 8b72cb2b..27d66f8b 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponse.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponse.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.sidecar.common.response;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -65,4 +66,25 @@ public class ListCdcSegmentsResponse
     {
         return segmentInfos;
     }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+        ListCdcSegmentsResponse that = (ListCdcSegmentsResponse) o;
+        return port == that.port && Objects.equals(host, that.host) && 
Objects.equals(segmentInfos, that.segmentInfos);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(host, port, segmentInfos);
+    }
 }
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/CdcSegmentInfo.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/CdcSegmentInfo.java
index 1745fa9d..a5ae9874 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/CdcSegmentInfo.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/CdcSegmentInfo.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.sidecar.common.response.data;
 
+import java.util.Objects;
+
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -45,4 +47,26 @@ public class CdcSegmentInfo
         this.completed = completed;
         this.lastModifiedTimestamp = lastModifiedTimestamp;
     }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+        CdcSegmentInfo that = (CdcSegmentInfo) o;
+        return size == that.size && idx == that.idx && completed == 
that.completed && lastModifiedTimestamp == that.lastModifiedTimestamp &&
+                Objects.equals(name, that.name);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(name, size, idx, completed, lastModifiedTimestamp);
+    }
 }
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 9e69432c..f896a79e 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -39,8 +39,10 @@ import 
org.apache.cassandra.sidecar.common.request.AbortRestoreJobRequest;
 import org.apache.cassandra.sidecar.common.request.CreateRestoreJobRequest;
 import 
org.apache.cassandra.sidecar.common.request.CreateRestoreJobSliceRequest;
 import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest;
+import org.apache.cassandra.sidecar.common.request.ListCdcSegmentsRequest;
 import org.apache.cassandra.sidecar.common.request.RestoreJobProgressRequest;
 import org.apache.cassandra.sidecar.common.request.RestoreJobSummaryRequest;
+import org.apache.cassandra.sidecar.common.request.StreamCdcSegmentRequest;
 import org.apache.cassandra.sidecar.common.request.UpdateRestoreJobRequest;
 import 
org.apache.cassandra.sidecar.common.request.data.AbortRestoreJobRequestPayload;
 import 
org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload;
@@ -51,6 +53,7 @@ import 
org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestP
 import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
 import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
 import org.apache.cassandra.sidecar.common.response.HealthResponse;
+import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
 import 
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
 import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
 import org.apache.cassandra.sidecar.common.response.NodeSettings;
@@ -491,6 +494,41 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
                                                             .build());
     }
 
+    /**
+     * Lists CDC commit logs in CDC directory for an instance
+     * @param sidecarInstance instance on which the CDC commit logs are to be 
listed
+     * @return a completable future with List of cdc commitLogs on the 
requested instance
+     */
+    public CompletableFuture<ListCdcSegmentsResponse> 
listCdcSegments(SidecarInstance sidecarInstance)
+    {
+        return executor.executeRequestAsync(requestBuilder()
+                       .singleInstanceSelectionPolicy(sidecarInstance)
+                       .request(new ListCdcSegmentsRequest())
+                       .build());
+    }
+
+    /**
+     * Streams CDC commit log segments from the requested instance.
+     *
+     * Streams the specified {@code range} of a CDC CommitLog from the given 
instance and the
+     * stream is consumed by the {@link StreamConsumer consumer}.
+     *
+     * @param sidecarInstance instance on which the CDC commit logs are to be 
streamed
+     * @param segment segment file name
+     * @param range range of the file to be streamed
+     * @param streamConsumer object that consumes the stream
+     */
+    public void streamCdcSegments(SidecarInstance sidecarInstance,
+                                  String segment,
+                                  HttpRange range,
+                                  StreamConsumer streamConsumer)
+    {
+        executor.streamRequest(requestBuilder()
+                .singleInstanceSelectionPolicy(sidecarInstance)
+                .request(new StreamCdcSegmentRequest(segment, range))
+                .build(), streamConsumer);
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index 3933c066..f32356ae 100644
--- 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++ 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -27,6 +27,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -45,6 +46,7 @@ import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpHeaderValues;
@@ -70,6 +72,7 @@ import 
org.apache.cassandra.sidecar.common.request.data.XXHash32Digest;
 import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
 import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
 import org.apache.cassandra.sidecar.common.response.HealthResponse;
+import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
 import 
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
 import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
 import org.apache.cassandra.sidecar.common.response.NodeSettings;
@@ -79,6 +82,7 @@ import 
org.apache.cassandra.sidecar.common.response.SSTableImportResponse;
 import org.apache.cassandra.sidecar.common.response.SchemaResponse;
 import org.apache.cassandra.sidecar.common.response.TimeSkewResponse;
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.response.data.CdcSegmentInfo;
 import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
 import 
org.apache.cassandra.sidecar.common.response.data.CreateRestoreJobResponsePayload;
 import org.apache.cassandra.sidecar.common.response.data.RingEntry;
@@ -1499,6 +1503,76 @@ abstract class SidecarClientTest
         }
     }
 
+    @Test
+    public void testListCdcSegments() throws ExecutionException, 
InterruptedException, JsonProcessingException
+    {
+        List<CdcSegmentInfo> segments = Arrays.asList(new 
CdcSegmentInfo("commit-log1", 100, 100, true, 1732148713725L),
+                new CdcSegmentInfo("commit-log2", 100, 10, false, 
1732148713725L));
+        ListCdcSegmentsResponse listSegmentsResponse = new 
ListCdcSegmentsResponse("localhost", 9043, segments);
+        ObjectMapper mapper = new ObjectMapper();
+
+        MockResponse response = new MockResponse();
+        response.setResponseCode(200);
+        response.setHeader("content-type", "application/json");
+        response.setBody(mapper.writeValueAsString(listSegmentsResponse));
+        enqueue(response);
+
+        SidecarInstance instance = instances.get(0);
+        ListCdcSegmentsResponse result = 
client.listCdcSegments(instance).get();
+        assertThat(result).isNotNull();
+        assertThat(result).isEqualTo(listSegmentsResponse);
+        validateResponseServed(ApiEndpointsV1.LIST_CDC_SEGMENTS_ROUTE);
+    }
+
+    @Test
+    public void testStreamCdcSegments() throws InterruptedException
+    {
+        MockResponse response = new MockResponse();
+        // mock reading the first 12 bytes, i.e. "Test Content" from a large 
blob (1024).
+        response.setResponseCode(200)
+                .setHeader(HttpHeaderNames.CONTENT_TYPE.toString(),
+                           HttpHeaderValues.APPLICATION_OCTET_STREAM)
+                .setHeader(HttpHeaderNames.ACCEPT_RANGES.toString(), "bytes")
+                .setHeader(HttpHeaderNames.CONTENT_RANGE.toString(), "bytes 
0-11/1024")
+                .setBody("Test Content");
+        enqueue(response);
+
+        SidecarInstance instance = instances.get(0);
+        CountDownLatch latch = new CountDownLatch(1);
+        List<byte[]> receivedBytes = new ArrayList<>();
+        StreamConsumer mockStreamConsumer = new StreamConsumer()
+        {
+            @Override
+            public void onRead(StreamBuffer buffer)
+            {
+                assertThat(buffer.readableBytes()).isGreaterThan(0);
+                byte[] dst = new byte[buffer.readableBytes()];
+                buffer.copyBytes(0, dst, 0, buffer.readableBytes());
+                receivedBytes.add(dst);
+            }
+
+            @Override
+            public void onComplete()
+            {
+                latch.countDown();
+            }
+
+            @Override
+            public void onError(Throwable throwable)
+            {
+                latch.countDown();
+            }
+        };
+        client.streamCdcSegments(instance, "testSegment", HttpRange.of(0, 11), 
mockStreamConsumer);
+        latch.await();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        for (byte[] bytes : receivedBytes)
+        {
+            baos.write(bytes, 0, bytes.length);
+        }
+        assertThat(new String(baos.toByteArray(), 
StandardCharsets.UTF_8)).isEqualTo("Test Content");
+    }
+
     private void enqueue(MockResponse response)
     {
         for (MockWebServer server : servers)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to