This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new c1260886 CASSSIDECAR-172: Adding support for CDC APIs into sidecar
client (#158)
c1260886 is described below
commit c1260886642e789fa15a3fb900251bea529fcc09
Author: Jyothsna konisa <[email protected]>
AuthorDate: Sat Dec 14 12:39:41 2024 -0800
CASSSIDECAR-172: Adding support for CDC APIs into sidecar client (#158)
Patch by Jyothsna Konisa; Reviewed by Bernardo Botella, Francisco Guerrero,
James Berragan, Yifan Cai for CASSSIDECAR-172
---
CHANGES.txt | 1 +
.../common/request/ListCdcSegmentsRequest.java | 40 ++++++++++++
.../common/request/StreamCdcSegmentRequest.java | 54 ++++++++++++++++
.../common/response/ListCdcSegmentsResponse.java | 22 +++++++
.../common/response/data/CdcSegmentInfo.java | 24 +++++++
.../cassandra/sidecar/client/SidecarClient.java | 38 +++++++++++
.../sidecar/client/SidecarClientTest.java | 74 ++++++++++++++++++++++
7 files changed, 253 insertions(+)
diff --git a/CHANGES.txt b/CHANGES.txt
index 70cabce9..d78e60ac 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -117,3 +117,4 @@
* Add support for SSL and bindable address (CASSANDRA-15030)
* Autogenerate API docs for sidecar (CASSANDRA-15028)
* C* Management process (CASSANDRA-14395)
+ * Adding support for CDC APIs into sidecar client (CASSSIDECAR-172)
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ListCdcSegmentsRequest.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ListCdcSegmentsRequest.java
new file mode 100644
index 00000000..90331d98
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ListCdcSegmentsRequest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
+
+/**
+ * Represents a request for listing commit log files on an instance
+ */
+public class ListCdcSegmentsRequest extends
JsonRequest<ListCdcSegmentsResponse>
+{
+ public ListCdcSegmentsRequest()
+ {
+ super(ApiEndpointsV1.LIST_CDC_SEGMENTS_ROUTE);
+ }
+
+ @Override
+ public HttpMethod method()
+ {
+ return HttpMethod.GET;
+ }
+}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/StreamCdcSegmentRequest.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/StreamCdcSegmentRequest.java
new file mode 100644
index 00000000..fe1ac54b
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/StreamCdcSegmentRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.utils.HttpRange;
+
+/**
+ * Represents a request to stream CDC segments(commit logs) on an instance.
+ */
+public class StreamCdcSegmentRequest extends Request
+{
+ private final HttpRange range;
+
+ public StreamCdcSegmentRequest(String segment, HttpRange range)
+ {
+ super(requestURI(segment));
+ this.range = range;
+ }
+
+ @Override
+ public HttpMethod method()
+ {
+ return HttpMethod.GET;
+ }
+
+ @Override
+ protected HttpRange range()
+ {
+ return range;
+ }
+
+ private static String requestURI(String segment)
+ {
+ return
ApiEndpointsV1.STREAM_CDC_SEGMENTS_ROUTE.replaceAll(ApiEndpointsV1.SEGMENT_PATH_PARAM,
segment);
+ }
+}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponse.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponse.java
index 8b72cb2b..27d66f8b 100644
---
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponse.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponse.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.sidecar.common.response;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -65,4 +66,25 @@ public class ListCdcSegmentsResponse
{
return segmentInfos;
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+ ListCdcSegmentsResponse that = (ListCdcSegmentsResponse) o;
+ return port == that.port && Objects.equals(host, that.host) &&
Objects.equals(segmentInfos, that.segmentInfos);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(host, port, segmentInfos);
+ }
}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/CdcSegmentInfo.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/CdcSegmentInfo.java
index 1745fa9d..a5ae9874 100644
---
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/CdcSegmentInfo.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/CdcSegmentInfo.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.sidecar.common.response.data;
+import java.util.Objects;
+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -45,4 +47,26 @@ public class CdcSegmentInfo
this.completed = completed;
this.lastModifiedTimestamp = lastModifiedTimestamp;
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+ CdcSegmentInfo that = (CdcSegmentInfo) o;
+ return size == that.size && idx == that.idx && completed ==
that.completed && lastModifiedTimestamp == that.lastModifiedTimestamp &&
+ Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(name, size, idx, completed, lastModifiedTimestamp);
+ }
}
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 9e69432c..f896a79e 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -39,8 +39,10 @@ import
org.apache.cassandra.sidecar.common.request.AbortRestoreJobRequest;
import org.apache.cassandra.sidecar.common.request.CreateRestoreJobRequest;
import
org.apache.cassandra.sidecar.common.request.CreateRestoreJobSliceRequest;
import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest;
+import org.apache.cassandra.sidecar.common.request.ListCdcSegmentsRequest;
import org.apache.cassandra.sidecar.common.request.RestoreJobProgressRequest;
import org.apache.cassandra.sidecar.common.request.RestoreJobSummaryRequest;
+import org.apache.cassandra.sidecar.common.request.StreamCdcSegmentRequest;
import org.apache.cassandra.sidecar.common.request.UpdateRestoreJobRequest;
import
org.apache.cassandra.sidecar.common.request.data.AbortRestoreJobRequestPayload;
import
org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload;
@@ -51,6 +53,7 @@ import
org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestP
import
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
import org.apache.cassandra.sidecar.common.response.HealthResponse;
+import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
import
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
import org.apache.cassandra.sidecar.common.response.NodeSettings;
@@ -491,6 +494,41 @@ public class SidecarClient implements AutoCloseable,
SidecarClientBlobRestoreExt
.build());
}
+ /**
+ * Lists CDC commit logs in CDC directory for an instance
+ * @param sidecarInstance instance on which the CDC commit logs are to be
listed
+ * @return a completable future with List of cdc commitLogs on the
requested instance
+ */
+ public CompletableFuture<ListCdcSegmentsResponse>
listCdcSegments(SidecarInstance sidecarInstance)
+ {
+ return executor.executeRequestAsync(requestBuilder()
+ .singleInstanceSelectionPolicy(sidecarInstance)
+ .request(new ListCdcSegmentsRequest())
+ .build());
+ }
+
+ /**
+ * Streams CDC commit log segments from the requested instance.
+ *
+ * Streams the specified {@code range} of a CDC CommitLog from the given
instance and the
+ * stream is consumed by the {@link StreamConsumer consumer}.
+ *
+ * @param sidecarInstance instance on which the CDC commit logs are to be
streamed
+ * @param segment segment file name
+ * @param range range of the file to be streamed
+ * @param streamConsumer object that consumes the stream
+ */
+ public void streamCdcSegments(SidecarInstance sidecarInstance,
+ String segment,
+ HttpRange range,
+ StreamConsumer streamConsumer)
+ {
+ executor.streamRequest(requestBuilder()
+ .singleInstanceSelectionPolicy(sidecarInstance)
+ .request(new StreamCdcSegmentRequest(segment, range))
+ .build(), streamConsumer);
+ }
+
/**
* {@inheritDoc}
*/
diff --git
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index 3933c066..f32356ae 100644
---
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -27,6 +27,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -45,6 +46,7 @@ import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
@@ -70,6 +72,7 @@ import
org.apache.cassandra.sidecar.common.request.data.XXHash32Digest;
import
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
import org.apache.cassandra.sidecar.common.response.HealthResponse;
+import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
import
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
import org.apache.cassandra.sidecar.common.response.NodeSettings;
@@ -79,6 +82,7 @@ import
org.apache.cassandra.sidecar.common.response.SSTableImportResponse;
import org.apache.cassandra.sidecar.common.response.SchemaResponse;
import org.apache.cassandra.sidecar.common.response.TimeSkewResponse;
import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.common.response.data.CdcSegmentInfo;
import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
import
org.apache.cassandra.sidecar.common.response.data.CreateRestoreJobResponsePayload;
import org.apache.cassandra.sidecar.common.response.data.RingEntry;
@@ -1499,6 +1503,76 @@ abstract class SidecarClientTest
}
}
+ @Test
+ public void testListCdcSegments() throws ExecutionException,
InterruptedException, JsonProcessingException
+ {
+ List<CdcSegmentInfo> segments = Arrays.asList(new
CdcSegmentInfo("commit-log1", 100, 100, true, 1732148713725L),
+ new CdcSegmentInfo("commit-log2", 100, 10, false,
1732148713725L));
+ ListCdcSegmentsResponse listSegmentsResponse = new
ListCdcSegmentsResponse("localhost", 9043, segments);
+ ObjectMapper mapper = new ObjectMapper();
+
+ MockResponse response = new MockResponse();
+ response.setResponseCode(200);
+ response.setHeader("content-type", "application/json");
+ response.setBody(mapper.writeValueAsString(listSegmentsResponse));
+ enqueue(response);
+
+ SidecarInstance instance = instances.get(0);
+ ListCdcSegmentsResponse result =
client.listCdcSegments(instance).get();
+ assertThat(result).isNotNull();
+ assertThat(result).isEqualTo(listSegmentsResponse);
+ validateResponseServed(ApiEndpointsV1.LIST_CDC_SEGMENTS_ROUTE);
+ }
+
+ @Test
+ public void testStreamCdcSegments() throws InterruptedException
+ {
+ MockResponse response = new MockResponse();
+ // mock reading the first 12 bytes, i.e. "Test Content" from a large
blob (1024).
+ response.setResponseCode(200)
+ .setHeader(HttpHeaderNames.CONTENT_TYPE.toString(),
+ HttpHeaderValues.APPLICATION_OCTET_STREAM)
+ .setHeader(HttpHeaderNames.ACCEPT_RANGES.toString(), "bytes")
+ .setHeader(HttpHeaderNames.CONTENT_RANGE.toString(), "bytes
0-11/1024")
+ .setBody("Test Content");
+ enqueue(response);
+
+ SidecarInstance instance = instances.get(0);
+ CountDownLatch latch = new CountDownLatch(1);
+ List<byte[]> receivedBytes = new ArrayList<>();
+ StreamConsumer mockStreamConsumer = new StreamConsumer()
+ {
+ @Override
+ public void onRead(StreamBuffer buffer)
+ {
+ assertThat(buffer.readableBytes()).isGreaterThan(0);
+ byte[] dst = new byte[buffer.readableBytes()];
+ buffer.copyBytes(0, dst, 0, buffer.readableBytes());
+ receivedBytes.add(dst);
+ }
+
+ @Override
+ public void onComplete()
+ {
+ latch.countDown();
+ }
+
+ @Override
+ public void onError(Throwable throwable)
+ {
+ latch.countDown();
+ }
+ };
+ client.streamCdcSegments(instance, "testSegment", HttpRange.of(0, 11),
mockStreamConsumer);
+ latch.await();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ for (byte[] bytes : receivedBytes)
+ {
+ baos.write(bytes, 0, bytes.length);
+ }
+ assertThat(new String(baos.toByteArray(),
StandardCharsets.UTF_8)).isEqualTo("Test Content");
+ }
+
private void enqueue(MockResponse response)
{
for (MockWebServer server : servers)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]