yifan-c commented on code in PR #99:
URL:
https://github.com/apache/cassandra-analytics/pull/99#discussion_r1996359837
##########
cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/CdcBuilder.java:
##########
@@ -41,26 +41,26 @@
public class CdcBuilder
{
@NotNull
- final String jobId;
- final int partitionId;
+ protected final String jobId;
+ protected final int partitionId;
@NotNull
TokenRangeSupplier tokenRangeSupplier = () -> null;
@NotNull
- SchemaSupplier schemaSupplier;
+ protected SchemaSupplier schemaSupplier;
@NotNull
CassandraSource cassandraSource = CassandraSource.DEFAULT;
@NotNull
StatePersister statePersister = StatePersister.STUB;
@NotNull
- CdcOptions cdcOptions = CdcOptions.DEFAULT;
+ protected CdcOptions cdcOptions = CdcOptions.DEFAULT;
@NotNull
ICdcStats stats = ICdcStats.STUB;
@Nullable
Review Comment:
`asyncExecutor` is not `Nullable`. The builder cannot be built, if the field
is `null`.
Update the annotation to `@NotNull`?
##########
cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarStatePersister.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.ThreadLocalMonotonicTimestampGenerator;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.StatePersister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.spark.utils.AsyncExecutor;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.apache.cassandra.util.CompressionUtil;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * SidecarStatePersister buffers CDC state and flushes at regular time
intervals, so we only write the latest CDC state and don't wastefully write
expired data.
+ */
+public class SidecarStatePersister implements StatePersister
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SidecarStatePersister.class);
+
+ // group latest state by jobId/token range, so we persist independently
+ protected final ConcurrentHashMap<PersistWrapper.Key, PersistWrapper>
latestState = new ConcurrentHashMap<>();
+ protected final ConcurrentLinkedQueue<TimedFutureWrapper> activeFlush =
new ConcurrentLinkedQueue<>();
+ private final ThreadLocalMonotonicTimestampGenerator timestampGenerator =
new ThreadLocalMonotonicTimestampGenerator();
+ private final SidecarCdcOptions sidecarCdcOptions;
+ private final CdcOptions cdcOptions;
+ private final SidecarCdcCassandraClient cassandraClient;
+ private final SidecarCdcStats sidecarCdcStats;
+ private final AsyncExecutor asyncExecutor;
+ volatile long timerId = -1L;
+
+ public SidecarStatePersister(SidecarCdcOptions sidecarCdcOptions,
+ CdcOptions cdcOptions,
+ SidecarCdcStats sidecarCdcStats,
+ SidecarCdcCassandraClient cassandraClient,
+ AsyncExecutor asyncExecutor)
+ {
+ this.sidecarCdcOptions = sidecarCdcOptions;
+ this.cdcOptions = cdcOptions;
+ this.sidecarCdcStats = sidecarCdcStats;
+ this.cassandraClient = cassandraClient;
+ this.asyncExecutor = asyncExecutor;
+ }
+
+ // StatePersister implemented methods
+
+ @Override
+ public void persist(String jobId, int partitionId, @Nullable TokenRange
tokenRange, @NotNull ByteBuffer buf)
+ {
+ final PersistWrapper latest = new PersistWrapper(jobId, partitionId,
tokenRange, buf, timestampGenerator.next());
+ PersistWrapper.Key key = latest.key();
+ if (!latest.equals(this.latestState.get(key)))
+ {
+ this.latestState.compute(key, (k, prev) -> !latest.equals(prev) ?
latest : prev);
+ }
+ }
+
+ @NotNull
+ @Override
+ public List<CdcState> loadState(String jobId, int partitionId, @Nullable
TokenRange tokenRange)
+ {
+ CompressionUtil compressionUtil =
CdcBridgeFactory.get(cdcOptions.version()).compressionUtil();
+ List<Integer> sizes = new ArrayList<>();
+ // deserialize and merge the CDC state objects into canonical view
+ List<CdcState> result = loadStateForRange(jobId, tokenRange)
+ .peek(bytes -> sizes.add(bytes.length))
+ .map(bytes ->
CdcState.deserialize(CdcKryoRegister.kryo(), compressionUtil, bytes))
+ .collect(Collectors.toList());
+ int count = sizes.size();
+ int len = sizes.stream().mapToInt(i -> i).sum();
+ LOGGER.debug("Read CDC state from Cassandra jobId={} start={} end={}
stateCount={} stateSize={}",
+ jobId, tokenRange == null ? "null" :
tokenRange.lowerEndpoint(), tokenRange == null ? "null" :
tokenRange.upperEndpoint(), count, len);
+ sidecarCdcStats.captureCdcConsumerReadFromState(count, len);
+ return result;
+ }
+
+ @VisibleForTesting
+ public Stream<byte[]> loadStateForRange(String jobId, @Nullable TokenRange
tokenRange)
+ {
+ return cassandraClient
+ .loadStateForRange(jobId, tokenRange);
+ }
+
+ /**
+ * Start the SidecarStatePersister to flush to Cassandra every
`persistDelay()`
+ */
+ public synchronized void start()
+ {
+ if (timerId >= 0)
+ {
+ // already running
+ return;
+ }
+ this.timerId = asyncExecutor.periodicTimer(this::persistToCassandra,
sidecarCdcOptions.persistDelay().toMillis());
+ }
+
+ /**
+ * Stop the SidecarStatePersister gracefully, blocking to await for any
pending flushes to complete.
+ */
+ public void stop()
+ {
+ stop(true);
+ }
+
+ public synchronized void stop(boolean flush)
+ {
+ if (this.timerId < 0)
+ {
+ // not running
+ return;
+ }
+
+ asyncExecutor.cancelTimer(this.timerId);
+ this.timerId = -1;
+
+ if (flush)
+ {
+ flush();
+ }
+ }
+
+ // internal methods
+
+ protected void persistToCassandra()
+ {
+ persistToCassandra(false);
+ }
+
+ protected void persistToCassandra(boolean force)
+ {
+ // clean-up finished futures
+ activeFlush.removeIf(wrapper -> {
+ if (wrapper.allDone())
+ {
+ try
+ {
+ wrapper.await();
+ sidecarCdcStats.capturePersistSucceeded(System.nanoTime()
- wrapper.startTimeNanos);
+ }
+ catch (InterruptedException e)
+ {
+ LOGGER.warn("Persist failed with InterruptedException", e);
+ Thread.currentThread().interrupt();
+ sidecarCdcStats.capturePersistFailed(e);
+ }
+ catch (ExecutionException e)
+ {
+ LOGGER.warn("Persist failed", e);
+ sidecarCdcStats.capturePersistFailed(e);
+ }
Review Comment:
Maybe just catch `Throwable` to be safe.
##########
cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import o.a.c.sidecar.client.shaded.common.utils.HttpRange;
+import org.apache.cassandra.cdc.api.CommitLog;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.clients.Sidecar;
+import org.apache.cassandra.secrets.SecretsProvider;
+import org.apache.cassandra.sidecar.client.SidecarClient;
+import org.apache.cassandra.sidecar.client.SidecarInstance;
+import org.apache.cassandra.sidecar.client.SidecarInstancesProvider;
+import org.apache.cassandra.sidecar.client.StreamBuffer;
+import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
+import org.apache.cassandra.spark.exceptions.TransportFailureException;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.apache.cassandra.spark.utils.streaming.StreamConsumer;
+
+public class SidecarCdcClient
+{
+ final Sidecar.ClientConfig config;
+ final SidecarClient sidecarClient;
+ final ICdcStats stats;
+
+ public SidecarCdcClient(Sidecar.ClientConfig config,
+ SidecarClient sidecarClient,
+ ICdcStats stats)
+ {
+ this.config = config;
+ this.sidecarClient = sidecarClient;
+ this.stats = stats;
+ }
+
+ public CompletableFuture<List<CommitLog>>
listCdcCommitLogSegments(CassandraInstance instance)
+ {
+ return sidecarClient.listCdcSegments(toSidecarInstance(instance))
+ .thenApply(
+ response ->
+ response.segmentInfos()
+ .stream()
+ .map(segment -> (CommitLog) new
SidecarCdcCommitLogSegment(this, instance, segment, config))
+ .collect(Collectors.toList())
+ ).exceptionally(throwable -> {
+ final Throwable cause = ThrowableUtils.rootCause(throwable);
+ if (cause instanceof TransportFailureException.Nonretryable
+ && ((TransportFailureException.Nonretryable)
cause).isNotFound())
+ {
+ // Rescue the 404 not found exception - it is a permitted error
+ return Collections.emptyList();
+ }
+ // Rethrow the other exception
+ if (throwable instanceof Error)
+ {
+ throw (Error) throwable;
+ }
+ throw new RuntimeException(cause);
+ });
+ }
+
+ public void streamCdcCommitLogSegment(CassandraInstance instance, String
segment, HttpRange httpRange, StreamConsumer streamConsumer)
+ {
+ sidecarClient.streamCdcSegments(toSidecarInstance(instance), segment,
httpRange, new org.apache.cassandra.sidecar.client.StreamConsumer()
+ {
+ @Override
+ public void onRead(StreamBuffer streamBuffer)
+ {
+ streamConsumer.onRead(new
org.apache.cassandra.spark.utils.streaming.StreamBuffer()
+ {
+ @Override
+ public void getBytes(int index, ByteBuffer destination,
int length)
+ {
+ streamBuffer.copyBytes(index, destination, length);
+ }
+
+ @Override
+ public void getBytes(int index, byte[] destination, int
destinationIndex, int length)
+ {
+ streamBuffer.copyBytes(index, destination,
destinationIndex, length);
+ }
+
+ @Override
+ public byte getByte(int index)
+ {
+ return streamBuffer.getByte(index);
+ }
+
+ @Override
+ public int readableBytes()
+ {
+ return streamBuffer.readableBytes();
+ }
+
+ @Override
+ public void release()
+ {
+ streamBuffer.release();
+ }
+ });
+ }
+
+ @Override
+ public void onComplete()
+ {
+ streamConsumer.onEnd();
+ }
+
+ @Override
+ public void onError(Throwable throwable)
+ {
+ streamConsumer.onError(throwable);
+ }
+ });
+ }
+
+ protected SidecarInstance toSidecarInstance(CassandraInstance instance)
+ {
+ return new SidecarInstance()
+ {
+ @Override
+ public int port()
+ {
+ return config.effectivePort();
+ }
+
+ @Override
+ public String hostname()
+ {
+ return instance.nodeName();
+ }
+ };
+ }
+
+ public static SidecarCdcClient from(SidecarInstancesProvider
sidecarInstancesProvider,
+ Sidecar.ClientConfig config,
+ SecretsProvider secretsProvider,
+ ICdcStats stats) throws IOException
+ {
+ return new SidecarCdcClient(config,
Sidecar.from(sidecarInstancesProvider, config, secretsProvider), stats);
+ }
Review Comment:
Unused.
##########
cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarStatePersister.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.ThreadLocalMonotonicTimestampGenerator;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.StatePersister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.spark.utils.AsyncExecutor;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.apache.cassandra.util.CompressionUtil;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * SidecarStatePersister buffers CDC state and flushes at regular time
intervals, so we only write the latest CDC state and don't wastefully write
expired data.
+ */
+public class SidecarStatePersister implements StatePersister
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SidecarStatePersister.class);
+
+ // group latest state by jobId/token range, so we persist independently
+ protected final ConcurrentHashMap<PersistWrapper.Key, PersistWrapper>
latestState = new ConcurrentHashMap<>();
+ protected final ConcurrentLinkedQueue<TimedFutureWrapper> activeFlush =
new ConcurrentLinkedQueue<>();
+ private final ThreadLocalMonotonicTimestampGenerator timestampGenerator =
new ThreadLocalMonotonicTimestampGenerator();
+ private final SidecarCdcOptions sidecarCdcOptions;
+ private final CdcOptions cdcOptions;
+ private final SidecarCdcCassandraClient cassandraClient;
+ private final SidecarCdcStats sidecarCdcStats;
+ private final AsyncExecutor asyncExecutor;
+ volatile long timerId = -1L;
+
+ public SidecarStatePersister(SidecarCdcOptions sidecarCdcOptions,
+ CdcOptions cdcOptions,
+ SidecarCdcStats sidecarCdcStats,
+ SidecarCdcCassandraClient cassandraClient,
+ AsyncExecutor asyncExecutor)
+ {
+ this.sidecarCdcOptions = sidecarCdcOptions;
+ this.cdcOptions = cdcOptions;
+ this.sidecarCdcStats = sidecarCdcStats;
+ this.cassandraClient = cassandraClient;
+ this.asyncExecutor = asyncExecutor;
+ }
+
+ // StatePersister implemented methods
+
+ @Override
+ public void persist(String jobId, int partitionId, @Nullable TokenRange
tokenRange, @NotNull ByteBuffer buf)
+ {
+ final PersistWrapper latest = new PersistWrapper(jobId, partitionId,
tokenRange, buf, timestampGenerator.next());
+ PersistWrapper.Key key = latest.key();
+ if (!latest.equals(this.latestState.get(key)))
+ {
+ this.latestState.compute(key, (k, prev) -> !latest.equals(prev) ?
latest : prev);
+ }
+ }
+
+ @NotNull
+ @Override
+ public List<CdcState> loadState(String jobId, int partitionId, @Nullable
TokenRange tokenRange)
+ {
+ CompressionUtil compressionUtil =
CdcBridgeFactory.get(cdcOptions.version()).compressionUtil();
+ List<Integer> sizes = new ArrayList<>();
+ // deserialize and merge the CDC state objects into canonical view
+ List<CdcState> result = loadStateForRange(jobId, tokenRange)
+ .peek(bytes -> sizes.add(bytes.length))
+ .map(bytes ->
CdcState.deserialize(CdcKryoRegister.kryo(), compressionUtil, bytes))
Review Comment:
nit:
```suggestion
.map(bytes -> {
sizes.add(bytes.length);
return
CdcState.deserialize(CdcKryoRegister.kryo(), compressionUtil, bytes);
})
```
##########
cassandra-analytics-sidecar-client/src/main/java/org/apache/cassandra/clients/Sidecar.java:
##########
@@ -122,51 +119,6 @@ public static SidecarClient from(SidecarInstancesProvider
sidecarInstancesProvid
return buildClient(sidecarConfig, vertx, httpClientConfig,
sidecarInstancesProvider);
}
- static String transportModeBasedWriterUserAgent(DataTransport transport)
- {
- switch (transport)
- {
- case S3_COMPAT:
- return BuildInfo.WRITER_S3_USER_AGENT;
- case DIRECT:
- default:
- return BuildInfo.WRITER_USER_AGENT;
- }
- }
-
- public static SidecarClient from(SidecarInstancesProvider
sidecarInstancesProvider, BulkSparkConf conf)
- {
- Vertx vertx = Vertx.vertx(new VertxOptions().setUseDaemonThread(true)
-
.setWorkerPoolSize(conf.getMaxHttpConnections()));
-
Review Comment:
Just curious, what prevent from having it kept in the same file? (instead of
moving to `AnalyticsSidecarClient`). I think the idea of further consolidate.
##########
cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarStatePersister.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.ThreadLocalMonotonicTimestampGenerator;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.StatePersister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.spark.utils.AsyncExecutor;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.apache.cassandra.util.CompressionUtil;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * SidecarStatePersister buffers CDC state and flushes at regular time
intervals, so we only write the latest CDC state and don't wastefully write
expired data.
+ */
+public class SidecarStatePersister implements StatePersister
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SidecarStatePersister.class);
+
+ // group latest state by jobId/token range, so we persist independently
+ protected final ConcurrentHashMap<PersistWrapper.Key, PersistWrapper>
latestState = new ConcurrentHashMap<>();
+ protected final ConcurrentLinkedQueue<TimedFutureWrapper> activeFlush =
new ConcurrentLinkedQueue<>();
+ private final ThreadLocalMonotonicTimestampGenerator timestampGenerator =
new ThreadLocalMonotonicTimestampGenerator();
+ private final SidecarCdcOptions sidecarCdcOptions;
+ private final CdcOptions cdcOptions;
+ private final SidecarCdcCassandraClient cassandraClient;
+ private final SidecarCdcStats sidecarCdcStats;
+ private final AsyncExecutor asyncExecutor;
+ volatile long timerId = -1L;
+
+ public SidecarStatePersister(SidecarCdcOptions sidecarCdcOptions,
+ CdcOptions cdcOptions,
+ SidecarCdcStats sidecarCdcStats,
+ SidecarCdcCassandraClient cassandraClient,
+ AsyncExecutor asyncExecutor)
+ {
+ this.sidecarCdcOptions = sidecarCdcOptions;
+ this.cdcOptions = cdcOptions;
+ this.sidecarCdcStats = sidecarCdcStats;
+ this.cassandraClient = cassandraClient;
+ this.asyncExecutor = asyncExecutor;
+ }
+
+ // StatePersister implemented methods
+
+ @Override
+ public void persist(String jobId, int partitionId, @Nullable TokenRange
tokenRange, @NotNull ByteBuffer buf)
+ {
+ final PersistWrapper latest = new PersistWrapper(jobId, partitionId,
tokenRange, buf, timestampGenerator.next());
+ PersistWrapper.Key key = latest.key();
+ if (!latest.equals(this.latestState.get(key)))
+ {
+ this.latestState.compute(key, (k, prev) -> !latest.equals(prev) ?
latest : prev);
+ }
+ }
+
+ @NotNull
+ @Override
+ public List<CdcState> loadState(String jobId, int partitionId, @Nullable
TokenRange tokenRange)
+ {
+ CompressionUtil compressionUtil =
CdcBridgeFactory.get(cdcOptions.version()).compressionUtil();
+ List<Integer> sizes = new ArrayList<>();
+ // deserialize and merge the CDC state objects into canonical view
+ List<CdcState> result = loadStateForRange(jobId, tokenRange)
+ .peek(bytes -> sizes.add(bytes.length))
+ .map(bytes ->
CdcState.deserialize(CdcKryoRegister.kryo(), compressionUtil, bytes))
+ .collect(Collectors.toList());
+ int count = sizes.size();
+ int len = sizes.stream().mapToInt(i -> i).sum();
+ LOGGER.debug("Read CDC state from Cassandra jobId={} start={} end={}
stateCount={} stateSize={}",
+ jobId, tokenRange == null ? "null" :
tokenRange.lowerEndpoint(), tokenRange == null ? "null" :
tokenRange.upperEndpoint(), count, len);
+ sidecarCdcStats.captureCdcConsumerReadFromState(count, len);
+ return result;
+ }
+
+ @VisibleForTesting
+ public Stream<byte[]> loadStateForRange(String jobId, @Nullable TokenRange
tokenRange)
+ {
+ return cassandraClient
+ .loadStateForRange(jobId, tokenRange);
+ }
+
+ /**
+ * Start the SidecarStatePersister to flush to Cassandra every
`persistDelay()`
+ */
+ public synchronized void start()
+ {
+ if (timerId >= 0)
+ {
+ // already running
+ return;
+ }
+ this.timerId = asyncExecutor.periodicTimer(this::persistToCassandra,
sidecarCdcOptions.persistDelay().toMillis());
+ }
+
+ /**
+ * Stop the SidecarStatePersister gracefully, blocking to await for any
pending flushes to complete.
+ */
+ public void stop()
+ {
+ stop(true);
+ }
+
+ public synchronized void stop(boolean flush)
+ {
+ if (this.timerId < 0)
+ {
+ // not running
+ return;
+ }
+
+ asyncExecutor.cancelTimer(this.timerId);
+ this.timerId = -1;
Review Comment:
Relying on the condition `timerId == -1` might not be sufficient. The
scheduled run could be still running when cancelTimer returns. However, it is
not something to be addressed in this patch. Just calling it out.
##########
cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/CdcBuilder.java:
##########
@@ -41,26 +41,26 @@
public class CdcBuilder
{
@NotNull
- final String jobId;
- final int partitionId;
+ protected final String jobId;
+ protected final int partitionId;
@NotNull
TokenRangeSupplier tokenRangeSupplier = () -> null;
@NotNull
- SchemaSupplier schemaSupplier;
+ protected SchemaSupplier schemaSupplier;
@NotNull
CassandraSource cassandraSource = CassandraSource.DEFAULT;
@NotNull
StatePersister statePersister = StatePersister.STUB;
@NotNull
- CdcOptions cdcOptions = CdcOptions.DEFAULT;
+ protected CdcOptions cdcOptions = CdcOptions.DEFAULT;
@NotNull
ICdcStats stats = ICdcStats.STUB;
@Nullable
- AsyncExecutor asyncExecutor = null;
+ protected AsyncExecutor asyncExecutor = null;
@Nullable
- CommitLogProvider commitLogProvider = null;
+ protected CommitLogProvider commitLogProvider = null;
@NotNull
- EventConsumer eventConsumer;
+ protected EventConsumer eventConsumer;
@NotNull
Review Comment:
Those are `@NotNull`
##########
cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCommitLogProviderTests.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.api.CommitLog;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.NotNull;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SidecarCommitLogProviderTests
+{
+ private static final List<CassandraInstance> INSTANCES = Arrays.asList(
+ new CassandraInstance("0", "local1", "DC1"),
+ new CassandraInstance("100", "local2", "DC1"),
+ new CassandraInstance("200", "local3", "DC1"),
+ new CassandraInstance("300", "local4", "DC1"),
+ new CassandraInstance("400", "local5", "DC1"),
+ new CassandraInstance("500", "local6", "DC1"),
+ new CassandraInstance("1", "local7", "DC2"),
+ new CassandraInstance("101", "local8", "DC2"),
+ new CassandraInstance("201", "local9", "DC2"),
+ new CassandraInstance("301", "local10", "DC2"),
+ new CassandraInstance("401", "local11", "DC2"),
+ new CassandraInstance("501", "local12", "DC2")
+ );
+
+ @Test
+ public void testCommitLogProvider()
+ {
+ CassandraInstance instance1 = INSTANCES.get(0);
+ CassandraInstance instance2 = INSTANCES.get(1);
+ CassandraInstance instance3 = INSTANCES.get(2);
+ CassandraInstance instance4 = INSTANCES.get(3);
+ CassandraInstance instance5 = INSTANCES.get(4);
+ CassandraInstance instance6 = INSTANCES.get(5);
+
+ ClusterConfigProvider clusterConfigProvider =
getClusterConfigProvider();
+ SidecarCdcClient sidecarCdcClient = mock(SidecarCdcClient.class);
+ Map<CassandraInstance, Integer> instanceListCount = new HashMap<>();
+ MutableInt segmentIdGenerator = new MutableInt(0);
+ Answer<CompletableFuture<List<CommitLog>>> listCommitLogAnswer =
invocation -> {
+ CassandraInstance instance = invocation.getArgument(0,
CassandraInstance.class);
+ instanceListCount.compute(instance, (key, prev) -> prev == null ?
1 : prev + 1);
+ int instanceId =
Integer.parseInt(instance.nodeName().substring(5));
+ int numLogs = instanceId * 13;
+ List<CommitLog> logs = IntStream.rangeClosed(1, numLogs)
+ .mapToObj(segmentId ->
mockCommitLog(instance, segmentIdGenerator.getAndAdd(1)))
+ .collect(Collectors.toList());
+ return CompletableFuture.completedFuture(logs);
+ };
+
+
when(sidecarCdcClient.listCdcCommitLogSegments(any(CassandraInstance.class))).thenAnswer(listCommitLogAnswer);
+ SidecarCommitLogProvider commitLogProvider =
getSidecarCommitLogProvider(clusterConfigProvider, sidecarCdcClient);
+
+ // list logs on instance1, instance2, instance3
+ TokenRange tokenRange = TokenRange.openClosed(
+ new BigInteger("-50"),
+ new BigInteger("-10")
+ );
+ List<CommitLog> logs = commitLogProvider
+ .logs(tokenRange)
+ .sorted()
+ .collect(Collectors.toList());
+ int expectedNumberOfLogs = 13 + (2 * 13) + (3 * 13);
+ assertEquals(expectedNumberOfLogs, logs.size());
+ IntStream.range(0, logs.size())
+ .forEach(index -> assertEquals(index,
logs.get(index).segmentId()));
+ assertEquals(3, instanceListCount.size());
+ assertEquals(1, instanceListCount.get(instance1));
+ assertEquals(1, instanceListCount.get(instance2));
+ assertEquals(1, instanceListCount.get(instance3));
+
+ // list logs on instance4, instance5, instance6
+ TokenRange tokenRange2 = TokenRange.openClosed(
+ new BigInteger("210"),
+ new BigInteger("280")
+ );
+ List<CommitLog> logs2 = commitLogProvider
+ .logs(tokenRange2)
+ .collect(Collectors.toList());
+ expectedNumberOfLogs = (4 * 13) + (5 * 13) + (6 * 13);
Review Comment:
nit: extract "4, 5, 6 and 13" to variable to improve readability.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]