This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new f123406 CASSANDRA-19909: Add writer options COORDINATED_WRITE_CONFIG
to define coordinated write to multiple Cassandra clusters (#79)
f123406 is described below
commit f123406e458c0112145f37dcd3f8c20ba47c949d
Author: Yifan Cai <[email protected]>
AuthorDate: Wed Sep 11 21:03:47 2024 -0700
CASSANDRA-19909: Add writer options COORDINATED_WRITE_CONFIG to define
coordinated write to multiple Cassandra clusters (#79)
The option specifies the configuration (in JSON) for coordinated write.
See
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.
When the option is present, SIDECAR_CONTACT_POINTS, SIDECAR_INSTANCES and
LOCAL_DC are ignored if they are present.
Patch by Yifan Cai; Reviewed by Doug Rohrer, Francisco Guerrero for
CASSANDRA-19909
---
CHANGES.txt | 1 +
.../spark/common/model/CassandraInstance.java | 14 +-
.../java/org/apache/cassandra/clients/Sidecar.java | 2 +-
.../cassandra/spark/bulkwriter/BulkSparkConf.java | 56 +++++-
.../spark/bulkwriter/CassandraClusterInfo.java | 44 ++---
.../spark/bulkwriter/CassandraContext.java | 6 +-
.../spark/bulkwriter/CassandraJobInfo.java | 9 +
.../cassandra/spark/bulkwriter/ClusterInfo.java | 20 +++
.../apache/cassandra/spark/bulkwriter/JobInfo.java | 7 +
.../cassandra/spark/bulkwriter/RingInstance.java | 58 ++++--
.../cassandra/spark/bulkwriter/WriterOptions.java | 6 +
.../coordinatedwrite/CoordinatedWriteConf.java | 199 +++++++++++++++++++++
.../spark/common/SidecarInstanceFactory.java | 12 ++
.../cassandra/spark/data/CassandraDataLayer.java | 12 +-
.../spark/bulkwriter/BulkSparkConfTest.java | 66 +++++++
.../spark/bulkwriter/BulkWriteValidatorTest.java | 4 +-
.../ImportCompletionCoordinatorTest.java | 1 +
.../spark/bulkwriter/MockBulkWriterContext.java | 15 ++
.../bulkwriter/RingInstanceSerializationTest.java | 2 +-
.../spark/bulkwriter/RingInstanceTest.java | 129 ++++++++-----
.../spark/bulkwriter/TokenRangeMappingUtils.java | 10 +-
.../coordinatedwrite/CoordinatedWriteConfTest.java | 118 ++++++++++++
22 files changed, 691 insertions(+), 100 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 33bcdd0..8e70567 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * Add writer option COORDINATED_WRITE_CONFIG to define coordinated write to
multiple Cassandra clusters (CASSANDRA-19909)
* Decouple Cassandra types from Spark types so Cassandra types can be used
independently from Spark (CASSANDRA-19815)
* Make the compression cache configurable to reduce heap pressure for large
SSTables (CASSANDRA-19900)
* Refactor TokenRangeMapping to use proper types instead of Strings
(CASSANDRA-19901)
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
index 3820efd..c69b6eb 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
@@ -20,16 +20,26 @@
package org.apache.cassandra.spark.common.model;
import org.apache.cassandra.spark.data.model.TokenOwner;
+import org.jetbrains.annotations.Nullable;
public interface CassandraInstance extends TokenOwner
{
+ /**
+ * @return ID string that can uniquely identify a cluster; the return
value is nullable
+ */
+ @Nullable String clusterId();
+
+ default boolean hasClusterId()
+ {
+ return clusterId() != null;
+ }
+
String nodeName();
String datacenter();
/**
- * IP address string of a Cassandra instance.
- * Mainly used in blocked instance list to identify instances.
+ * IP address string (w/o port) of a Cassandra instance.
* Prefer to use {@link #ipAddressWithPort} as instance identifier,
* unless knowing the compared is IP address without port for sure.
*/
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
index 6d558dd..2f17040 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
@@ -184,7 +184,7 @@ public final class Sidecar
}
public static List<CompletableFuture<NodeSettings>>
allNodeSettings(SidecarClient client,
- Set<?
extends SidecarInstance> instances)
+
Set<SidecarInstance> instances)
{
return instances.stream()
.map(instance -> client
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index f03e743..3c45546 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -42,6 +42,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.spark.bulkwriter.blobupload.StorageClientConfig;
+import
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf;
+import
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.SimpleClusterConf;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
import org.apache.cassandra.spark.bulkwriter.util.SbwKryoRegistrator;
import org.apache.cassandra.spark.common.SidecarInstanceFactory;
@@ -148,7 +150,9 @@ public class BulkSparkConf implements Serializable
protected int ringRetryCount;
// create sidecarInstances from sidecarContactPointsValue and
effectiveSidecarPort
private final String sidecarContactPointsValue; // It takes comma
separated values
- private transient Set<? extends SidecarInstance> sidecarContactPoints; //
not serialized
+ private transient Set<SidecarInstance> sidecarContactPoints; // not
serialized
+ private final String coordinatedWriteConfJson;
+ private transient CoordinatedWriteConf coordinatedWriteConf; // it is
transient; deserialized from coordinatedWriteConfJson in executors
public BulkSparkConf(SparkConf conf, Map<String, String> options)
{
@@ -223,7 +227,8 @@ public class BulkSparkConf implements Serializable
}
this.jobTimeoutSeconds = MapUtils.getLong(options,
WriterOptions.JOB_TIMEOUT_SECONDS.name(), -1L);
this.configuredJobId = MapUtils.getOrDefault(options,
WriterOptions.JOB_ID.name(), null);
-
+ this.coordinatedWriteConfJson = MapUtils.getOrDefault(options,
WriterOptions.COORDINATED_WRITE_CONFIG.name(), null);
+ this.coordinatedWriteConf =
buildCoordinatedWriteConf(dataTransportInfo.getTransport());
validateEnvironment();
}
@@ -263,7 +268,7 @@ public class BulkSparkConf implements Serializable
});
}
- protected Set<? extends SidecarInstance> buildSidecarContactPoints()
+ protected Set<SidecarInstance> buildSidecarContactPoints()
{
String[] split = Objects.requireNonNull(sidecarContactPointsValue,
"Unable to build sidecar instances from null value")
.split(",");
@@ -273,7 +278,7 @@ public class BulkSparkConf implements Serializable
.collect(Collectors.toSet());
}
- Set<? extends SidecarInstance> sidecarContactPoints()
+ Set<SidecarInstance> sidecarContactPoints()
{
if (sidecarContactPoints == null)
{
@@ -282,6 +287,45 @@ public class BulkSparkConf implements Serializable
return sidecarContactPoints;
}
+ public boolean isCoordinatedWriteConfigured()
+ {
+ return coordinatedWriteConf != null;
+ }
+
+ public CoordinatedWriteConf coordinatedWriteConf()
+ {
+ if (coordinatedWriteConf == null)
+ {
+ coordinatedWriteConf =
buildCoordinatedWriteConf(dataTransportInfo.getTransport());
+ }
+
+ return coordinatedWriteConf;
+ }
+
+ @Nullable
+ protected CoordinatedWriteConf buildCoordinatedWriteConf(DataTransport
dataTransport)
+ {
+ if (coordinatedWriteConfJson == null)
+ {
+ return null;
+ }
+
+ Preconditions.checkArgument(dataTransport == DataTransport.S3_COMPAT,
+ "Coordinated write only supports " +
DataTransport.S3_COMPAT);
+
+ if (sidecarContactPointsValue != null)
+ {
+ LOGGER.warn("SIDECAR_CONTACT_POINTS or SIDECAR_INSTANCES are
ignored on the presence of COORDINATED_WRITE_CONF");
+ }
+
+ if (localDC != null)
+ {
+ LOGGER.warn("LOCAL_DC is ignored on the presence of
COORDINATED_WRITE_CONF");
+ }
+
+ return CoordinatedWriteConf.create(coordinatedWriteConfJson,
consistencyLevel, SimpleClusterConf.class);
+ }
+
protected void validateEnvironment() throws RuntimeException
{
Preconditions.checkNotNull(keyspace);
@@ -351,12 +395,12 @@ public class BulkSparkConf implements Serializable
return truststorePath;
}
- protected TTLOption getTTLOptions()
+ public TTLOption getTTLOptions()
{
return TTLOption.from(ttl);
}
- protected TimestampOption getTimestampOptions()
+ public TimestampOption getTimestampOptions()
{
return TimestampOption.from(timestamp);
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
index 359dc11..4165c7c 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
@@ -36,7 +36,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import o.a.c.sidecar.client.shaded.common.response.GossipInfoResponse;
import o.a.c.sidecar.client.shaded.common.response.NodeSettings;
import o.a.c.sidecar.client.shaded.common.response.SchemaResponse;
import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
@@ -63,19 +62,26 @@ public class CassandraClusterInfo implements ClusterInfo,
Closeable
private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraClusterInfo.class);
protected final BulkSparkConf conf;
+ protected final String clusterId;
protected String cassandraVersion;
protected Partitioner partitioner;
- protected transient TokenRangeMapping<RingInstance> tokenRangeReplicas;
- protected transient String keyspaceSchema;
- protected transient GossipInfoResponse gossipInfo;
- protected transient CassandraContext cassandraContext;
+ protected transient volatile TokenRangeMapping<RingInstance>
tokenRangeReplicas;
+ protected transient volatile String keyspaceSchema;
+ protected transient volatile CassandraContext cassandraContext;
protected final transient AtomicReference<NodeSettings> nodeSettings;
protected final transient List<CompletableFuture<NodeSettings>>
allNodeSettingFutures;
public CassandraClusterInfo(BulkSparkConf conf)
+ {
+ this(conf, null);
+ }
+
+ // Used by CassandraClusterInfoGroup
+ public CassandraClusterInfo(BulkSparkConf conf, String clusterId)
{
this.conf = conf;
+ this.clusterId = clusterId;
this.cassandraContext = buildCassandraContext();
LOGGER.info("Getting Cassandra versions from all nodes");
this.nodeSettings = new AtomicReference<>(null);
@@ -113,6 +119,12 @@ public class CassandraClusterInfo implements ClusterInfo,
Closeable
}
}
+ @Override
+ public String clusterId()
+ {
+ return clusterId;
+ }
+
/**
* Gets a Cassandra Context
* <p>
@@ -191,15 +203,11 @@ public class CassandraClusterInfo implements ClusterInfo,
Closeable
}
@Override
- public void refreshClusterInfo()
+ public synchronized void refreshClusterInfo()
{
- synchronized (this)
- {
- // Set backing stores to null and let them lazy-load on the next
call
- gossipInfo = null;
- keyspaceSchema = null;
- getCassandraContext().refreshClusterConfig();
- }
+ // Set backing stores to null and let them lazy-load on the next call
+ keyspaceSchema = null;
+ getCassandraContext().refreshClusterConfig();
}
protected String getCurrentKeyspaceSchema() throws Exception
@@ -232,13 +240,6 @@ public class CassandraClusterInfo implements ClusterInfo,
Closeable
}
}
- private Set<String>
readReplicasFromTokenRangeResponse(TokenRangeReplicasResponse response)
- {
- return response.readReplicas().stream()
- .flatMap(rr ->
rr.replicasByDatacenter().values().stream())
- .flatMap(List::stream).collect(Collectors.toSet());
- }
-
@NotNull
protected ReplicationFactor getReplicationFactor()
{
@@ -287,7 +288,8 @@ public class CassandraClusterInfo implements ClusterInfo,
Closeable
return topology;
}
- // Block for the call-sites requesting the latest view of the ring;
but it is OK to serve the other call-sites that request for the cached view
+ // Block only for the call-sites requesting the latest view of the ring
+ // The other call-sites get the cached/stale view
// We can avoid synchronization here
if (topology != null)
{
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
index 1da6083..41a7be1 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
@@ -40,7 +40,7 @@ public class CassandraContext implements StartupValidatable,
Closeable
{
private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraContext.class);
@NotNull
- protected Set<? extends SidecarInstance> clusterConfig;
+ protected Set<SidecarInstance> clusterConfig;
private final BulkSparkConf conf;
private final transient SidecarClient sidecarClient;
@@ -57,7 +57,7 @@ public class CassandraContext implements StartupValidatable,
Closeable
return new CassandraContext(conf);
}
- public Set<? extends SidecarInstance> getCluster()
+ public Set<SidecarInstance> getCluster()
{
return clusterConfig;
}
@@ -86,7 +86,7 @@ public class CassandraContext implements StartupValidatable,
Closeable
return Sidecar.from(new SimpleSidecarInstancesProvider(new
ArrayList<>(clusterConfig)), conf);
}
- protected Set<? extends SidecarInstance> createClusterConfig()
+ protected Set<SidecarInstance> createClusterConfig()
{
return conf.sidecarContactPoints();
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
index 39ba908..8e12f07 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
@@ -21,9 +21,11 @@ package org.apache.cassandra.spark.bulkwriter;
import java.util.UUID;
+import
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
import org.apache.cassandra.spark.data.QualifiedTableName;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
public class CassandraJobInfo implements JobInfo
{
@@ -105,6 +107,13 @@ public class CassandraJobInfo implements JobInfo
return conf.importCoordinatorTimeoutMultiplier;
}
+ @Nullable
+ @Override
+ public CoordinatedWriteConf coordinatedWriteConf()
+ {
+ return conf.coordinatedWriteConf();
+ }
+
@Override
public int getCommitThreadsPerInstance()
{
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
index b6ed979..c6fe930 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
@@ -27,6 +27,7 @@ import
o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.validation.StartupValidatable;
+import org.jetbrains.annotations.Nullable;
public interface ClusterInfo extends StartupValidatable, Serializable
{
@@ -47,10 +48,29 @@ public interface ClusterInfo extends StartupValidatable,
Serializable
TimeSkewResponse getTimeSkew(List<RingInstance> replicas);
+ /**
+ * Return the keyspace schema string of the enclosing keyspace for bulk
write in the cluster
+ * @param cached whether using the cached schema information
+ * @return keyspace schema string
+ */
String getKeyspaceSchema(boolean cached);
CassandraContext getCassandraContext();
+ /**
+ * ID string that can uniquely identify a cluster
+ * <p>
+ * Implementor note: the method is optional. When writing to a single
cluster, there is no requirement of assigning an ID for bulk write to proceed.
+ * When in the coordinated write mode, i.e. writing to multiple clusters,
the method must be implemented and return unique string for clusters.
+ *
+ * @return cluster id string, null if absent
+ */
+ @Nullable
+ default String clusterId()
+ {
+ return null;
+ }
+
default void close()
{
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
index a454827..496580e 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.spark.bulkwriter;
import java.io.Serializable;
import java.util.UUID;
+import
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
import org.apache.cassandra.spark.data.QualifiedTableName;
import org.jetbrains.annotations.NotNull;
@@ -100,4 +101,10 @@ public interface JobInfo extends Serializable
* @return multiplier to calculate the final timeout for import coordinator
*/
double importCoordinatorTimeoutMultiplier();
+
+ /**
+ * @return CoordinatedWriteConf if configured, null otherwise
+ */
+ @Nullable
+ CoordinatedWriteConf coordinatedWriteConf();
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
index ae04447..f0edb71 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
@@ -25,6 +25,8 @@ import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Objects;
+import com.google.common.annotations.VisibleForTesting;
+
import
o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaMetadata;
import o.a.c.sidecar.client.shaded.common.response.data.RingEntry;
import org.apache.cassandra.spark.common.model.CassandraInstance;
@@ -36,14 +38,11 @@ public class RingInstance implements CassandraInstance,
Serializable
{
private static final long serialVersionUID = 4399143234683369652L;
private RingEntry ringEntry;
+ private @Nullable String clusterId;
- public RingInstance(RingEntry ringEntry)
- {
- this.ringEntry = ringEntry;
- }
-
- public RingInstance(ReplicaMetadata replica)
+ public RingInstance(ReplicaMetadata replica, @Nullable String clusterId)
{
+ this.clusterId = clusterId;
this.ringEntry = new RingEntry.Builder()
.fqdn(replica.fqdn())
.address(replica.address())
@@ -54,6 +53,25 @@ public class RingInstance implements CassandraInstance,
Serializable
.build();
}
+ @VisibleForTesting
+ public RingInstance(RingEntry ringEntry)
+ {
+ this(ringEntry, null);
+ }
+
+ @VisibleForTesting
+ public RingInstance(RingEntry ringEntry, @Nullable String clusterId)
+ {
+ this.clusterId = clusterId;
+ this.ringEntry = ringEntry;
+ }
+
+ @VisibleForTesting
+ public RingInstance(ReplicaMetadata replica)
+ {
+ this(replica, null);
+ }
+
// Used only in tests
@Override
public String token()
@@ -61,6 +79,12 @@ public class RingInstance implements CassandraInstance,
Serializable
return ringEntry.token();
}
+ @Override
+ public String clusterId()
+ {
+ return clusterId;
+ }
+
@Override
public String nodeName()
{
@@ -98,9 +122,9 @@ public class RingInstance implements CassandraInstance,
Serializable
}
/**
- * Custom equality that compares the token, fully qualified domain name,
the port, and the datacenter
+ * Custom equality that compares the token, fully qualified domain name,
the port, the datacenter and the clusterId
*
- * Note that node state and status are not part of the calculation.
+ * Note that node state, status, are not part of the calculation.
*
* @param other the other instance
* @return true if both instances are equal, false otherwise
@@ -108,13 +132,20 @@ public class RingInstance implements CassandraInstance,
Serializable
@Override
public boolean equals(@Nullable Object other)
{
- if (other == null || !(other instanceof RingInstance))
+ if (this == other)
+ {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass())
{
return false;
}
final RingInstance that = (RingInstance) other;
- return Objects.equals(ringEntry.token(), that.ringEntry.token())
+ return Objects.equals(clusterId, that.clusterId)
+ && Objects.equals(ringEntry.token(), that.ringEntry.token())
&& Objects.equals(ringEntry.fqdn(), that.ringEntry.fqdn())
+ && Objects.equals(ringEntry.rack(), that.ringEntry.rack())
&& Objects.equals(ringEntry.address(), that.ringEntry.address())
&& ringEntry.port() == that.ringEntry.port()
&& Objects.equals(ringEntry.datacenter(),
that.ringEntry.datacenter());
@@ -130,7 +161,7 @@ public class RingInstance implements CassandraInstance,
Serializable
@Override
public int hashCode()
{
- return Objects.hash(ringEntry.token(), ringEntry.fqdn(),
ringEntry.port(), ringEntry.datacenter(), ringEntry.address());
+ return Objects.hash(clusterId, ringEntry.token(), ringEntry.fqdn(),
ringEntry.rack(), ringEntry.port(), ringEntry.datacenter(),
ringEntry.address());
}
@Override
@@ -139,7 +170,7 @@ public class RingInstance implements CassandraInstance,
Serializable
return ringEntry.toString();
}
- public RingEntry ringInstance()
+ public RingEntry ringEntry()
{
return ringEntry;
}
@@ -158,6 +189,7 @@ public class RingInstance implements CassandraInstance,
Serializable
out.writeObject(ringEntry.hostId());
out.writeObject(ringEntry.load());
out.writeObject(ringEntry.owns());
+ out.writeObject(clusterId);
}
private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException
@@ -174,6 +206,7 @@ public class RingInstance implements CassandraInstance,
Serializable
String hostId = (String) in.readObject();
String load = (String) in.readObject();
String owns = (String) in.readObject();
+ String clusterId = (String) in.readObject();
ringEntry = new RingEntry.Builder().datacenter(datacenter)
.address(address)
.port(port)
@@ -186,5 +219,6 @@ public class RingInstance implements CassandraInstance,
Serializable
.load(load)
.owns(owns)
.build();
+ this.clusterId = clusterId;
}
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
index 6c7eba7..1b36bd0 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
@@ -31,6 +31,12 @@ public enum WriterOptions implements WriterOption
// The option specifies the initial contact points of sidecar servers to
discover the cluster topology
// Note that the addresses can include port; when port is present, it
takes precedence over SIDECAR_PORT
SIDECAR_CONTACT_POINTS,
+ /**
+ * The option specifies the configuration (in JSON) for coordinated write.
+ * See
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.
+ * When the option is present, SIDECAR_CONTACT_POINTS, SIDECAR_INSTANCES
and LOCAL_DC are ignored if they are present.
+ */
+ COORDINATED_WRITE_CONFIG,
KEYSPACE,
TABLE,
BULK_WRITER_CL,
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConf.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConf.java
new file mode 100644
index 0000000..63fc017
--- /dev/null
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConf.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+import org.apache.cassandra.sidecar.client.SidecarInstance;
+import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
+import org.apache.cassandra.spark.common.SidecarInstanceFactory;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toSet;
+
+/**
+ * Data class containing the configurations required for coordinated write.
+ * The serialization format is JSON string. The class takes care of
serialization and deserialization.
+ */
+public class CoordinatedWriteConf
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CoordinatedWriteConf.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ // The runtime type of ClusterConfProvider is erased; use clustersOf
method to read the desired type back
+ private final Map<String, ClusterConf> clusters;
+
+ /**
+ * Parse JSON string and create a CoordinatedWriteConf object with the
specified ClusterConfProvider format
+ *
+ * @param json JSON string
+ * @param clusterConfType concrete type of ClusterConfProvider that can be
used for JSON serialization and deserialization
+ * @return CoordinatedWriteConf object
+ * @param <T> subtype of ClusterConfProvider
+ */
+ public static <T extends ClusterConf>
+ CoordinatedWriteConf create(String json, ConsistencyLevel.CL
consistencyLevel, Class<T> clusterConfType)
+ {
+ JavaType javaType =
TypeFactory.defaultInstance().constructMapType(Map.class, String.class,
clusterConfType);
+ CoordinatedWriteConf result;
+ try
+ {
+ result = new CoordinatedWriteConf(OBJECT_MAPPER.readValue(json,
javaType));
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Unable to parse json string
into CoordinatedWriteConf of " + clusterConfType.getSimpleName() +
+ " due to " + e.getMessage(), e);
+ }
+ result.clusters().forEach((clusterId, cluster) -> {
+ if (consistencyLevel.isLocal())
+ {
+
Preconditions.checkState(!StringUtils.isEmpty(cluster.localDc()),
+ "localDc is not configured for
cluster: " + clusterId + " for consistency level: " + consistencyLevel);
+ }
+ else
+ {
+ if (cluster.localDc() != null)
+ {
+ LOGGER.warn("Ignoring the localDc configured for cluster,
when consistency level is non-local. cluster={} consistencyLevel={}",
+ clusterId, consistencyLevel);
+ }
+ }
+ });
+ return result;
+ }
+
+ public CoordinatedWriteConf(Map<String, ? extends ClusterConf> clusters)
+ {
+ this.clusters = Collections.unmodifiableMap(clusters);
+ }
+
+ public Map<String, ClusterConf> clusters()
+ {
+ return clusters;
+ }
+
+ @Nullable
+ public ClusterConf cluster(String clusterId)
+ {
+ return clusters.get(clusterId);
+ }
+
+ public <T extends ClusterConf> Map<String, T> clustersOf(Class<T>
clusterConfType)
+ {
+ // verify that map type can cast; there are only limited number of
values and check is cheap
+ clusters.values().forEach(v ->
Preconditions.checkState(clusterConfType.isInstance(v),
+
"ClusterConfProvider value is not instance of " + clusterConfType));
+ return (Map<String, T>) clusters;
+ }
+
+ public String toJson() throws JsonProcessingException
+ {
+ return OBJECT_MAPPER.writeValueAsString(clusters);
+ }
+
+ public interface ClusterConf
+ {
+ Set<SidecarInstance> sidecarContactPoints();
+
+ @Nullable
+ String localDc();
+
+ @Nullable
+ default String resolveLocalDc(ConsistencyLevel.CL cl)
+ {
+ String localDc = localDc();
+ boolean hasLocalDc = !StringUtils.isEmpty(localDc());
+ if (!cl.isLocal() && hasLocalDc)
+ {
+ return null;
+ }
+ if (cl.isLocal() && !hasLocalDc)
+ {
+ throw new IllegalStateException("No localDc is specified for
local consistency level: " + cl);
+ }
+ return localDc;
+ }
+ }
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public static class SimpleClusterConf implements ClusterConf
+ {
+ private final List<String> sidecarContactPointsValue;
+ private final Set<SidecarInstance> sidecarContactPoints;
+ private final @Nullable String localDc;
+
+ @JsonCreator
+ public SimpleClusterConf(@JsonProperty("sidecarContactPoints")
List<String> sidecarContactPointsValue,
+ @JsonProperty("localDc") String localDc)
+ {
+ this.sidecarContactPointsValue = sidecarContactPointsValue;
+ this.sidecarContactPoints =
buildSidecarContactPoints(sidecarContactPointsValue);
+ this.localDc = localDc;
+ }
+
+ @JsonProperty("sidecarContactPoints")
+ public List<String> sidecarContactPointsValue()
+ {
+ return sidecarContactPointsValue;
+ }
+
+ @Nullable
+ @Override
+ @JsonProperty("localDc")
+ public String localDc()
+ {
+ return localDc;
+ }
+
+ @Override
+ public Set<SidecarInstance> sidecarContactPoints()
+ {
+ return sidecarContactPoints;
+ }
+
+ private Set<SidecarInstance> buildSidecarContactPoints(List<String>
sidecarContactPoints)
+ {
+ return sidecarContactPoints.stream()
+ .filter(StringUtils::isNotEmpty)
+
.map(SidecarInstanceFactory::createFromString)
+ .collect(collectingAndThen(toSet(),
Collections::unmodifiableSet));
+ }
+ }
+}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java
index cc6fb09..ce69703 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java
@@ -58,7 +58,19 @@ public class SidecarInstanceFactory
port = Integer.parseInt(portStr);
}
+ Preconditions.checkState(port != -1, "Unable to resolve port from %s",
input);
+
LOGGER.info("Create sidecar instance. hostname={} port={}", hostname,
port);
return new SidecarInstanceImpl(hostname, port);
}
+
+ /**
+ * Similar to {@link SidecarInstanceFactory#createFromString(String,
int)}, but it requires that the input string must include port
+ * @param hostnameWithPort hostname with port
+ * @return SidecarInstanceImpl
+ */
+ public static SidecarInstanceImpl createFromString(String hostnameWithPort)
+ {
+ return createFromString(hostnameWithPort, -1);
+ }
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 5e26deb..369e58e 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -124,7 +124,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
// create clusterConfig from sidecarInstances and sidecarPort, see
initializeClusterConfig
protected String sidecarInstances;
protected int sidecarPort;
- protected transient Set<? extends SidecarInstance> clusterConfig;
+ protected transient Set<SidecarInstance> clusterConfig;
protected TokenPartitioner tokenPartitioner;
protected Map<String, AvailabilityHint> availabilityHints;
protected Sidecar.ClientConfig sidecarClientConfig;
@@ -907,13 +907,13 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
}
}
- protected Set<? extends SidecarInstance>
initializeClusterConfig(ClientConfig options)
+ protected Set<SidecarInstance> initializeClusterConfig(ClientConfig
options)
{
return initializeClusterConfig(options.sidecarContactPoints,
options.sidecarPort());
}
// not intended to be overridden
- private Set<? extends SidecarInstance> initializeClusterConfig(String
sidecarInstances, int sidecarPort)
+ private Set<SidecarInstance> initializeClusterConfig(String
sidecarInstances, int sidecarPort)
{
return Arrays.stream(sidecarInstances.split(","))
.filter(StringUtils::isNotEmpty)
@@ -921,7 +921,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
.collect(Collectors.toSet());
}
- protected String getEffectiveCassandraVersionForRead(Set<? extends
SidecarInstance> clusterConfig,
+ protected String getEffectiveCassandraVersionForRead(Set<SidecarInstance>
clusterConfig,
NodeSettings
nodeSettings)
{
return nodeSettings.releaseVersion();
@@ -932,7 +932,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
LOGGER.info("Dial home. clientConfig={}", options);
}
- protected void clearSnapshot(Set<? extends SidecarInstance> clusterConfig,
@NotNull ClientConfig options)
+ protected void clearSnapshot(Set<SidecarInstance> clusterConfig, @NotNull
ClientConfig options)
{
if (maybeQuotedKeyspace == null || maybeQuotedTable == null)
{
@@ -982,7 +982,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
* @param options the {@link ClientConfig} options
* @return the {@link Sizing} object based on the {@code sizing} option
provided by the user
*/
- protected Sizing getSizing(Set<? extends SidecarInstance> clusterConfig,
+ protected Sizing getSizing(Set<SidecarInstance> clusterConfig,
ReplicationFactor replicationFactor,
ClientConfig options)
{
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
index d48ace8..3130fc1 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
@@ -28,6 +28,8 @@ import com.google.common.collect.Maps;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
+import
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf;
import org.apache.cassandra.spark.bulkwriter.util.SbwKryoRegistrator;
import org.apache.cassandra.spark.utils.BuildInfo;
import org.apache.spark.SparkConf;
@@ -275,6 +277,70 @@ class BulkSparkConfTest
.isNull();
}
+ @Test
+ void testReadCoordinatedWriteConfFails()
+ {
+ Map<String, String> options = copyDefaultOptions();
+ String coordinatedWriteConfJsonNoLocalDc = "{\"cluster1\":" +
+
"{\"sidecarContactPoints\":[\"instance-1:9999\",\"instance-2:9999\",\"instance-3:9999\"]}}";
+
+ options.put(WriterOptions.COORDINATED_WRITE_CONFIG.name(),
coordinatedWriteConfJsonNoLocalDc);
+ assertThatThrownBy(() -> new BulkSparkConf(sparkConf, options))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Coordinated write only supports S3_COMPAT");
+
+ options.put(WriterOptions.DATA_TRANSPORT.name(),
DataTransport.S3_COMPAT.name());
+ options.put(WriterOptions.BULK_WRITER_CL.name(), "LOCAL_QUORUM");
+ assertThatThrownBy(() -> new BulkSparkConf(sparkConf, options))
+ .isExactlyInstanceOf(IllegalStateException.class)
+ .hasMessage("localDc is not configured for cluster: cluster1 for
consistency level: LOCAL_QUORUM");
+
+ options.put(WriterOptions.COORDINATED_WRITE_CONFIG.name(), "invalid
json");
+ assertThatThrownBy(() -> new BulkSparkConf(sparkConf, options))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unable to parse json string into
CoordinatedWriteConf of SimpleClusterConf due to Unrecognized token 'invalid'");
+ }
+
+ @Test
+ void testCoordinatedWriteConf()
+ {
+ Map<String, String> options = copyDefaultOptions();
+ options.remove(WriterOptions.COORDINATED_WRITE_CONFIG.name());
+ BulkSparkConf conf = new BulkSparkConf(sparkConf, options);
+ assertThat(conf.isCoordinatedWriteConfigured())
+ .describedAs("When COORDINATED_WRITE_CONF is absent,
isCoordinatedWriteConfigured should return false")
+ .isFalse();
+
+ String coordinatedWriteConfJson = "{\"cluster1\":" +
+
"{\"sidecarContactPoints\":[\"instance-1:9999\",\"instance-2:9999\",\"instance-3:9999\"],"
+
+ "\"localDc\":\"dc1\"}," +
+ "\"cluster2\":" +
+
"{\"sidecarContactPoints\":[\"instance-4:8888\",\"instance-5:8888\",\"instance-6:8888\"],"
+
+ "\"localDc\":\"dc1\"}}";
+ options.put(WriterOptions.DATA_TRANSPORT.name(),
DataTransport.S3_COMPAT.name());
+ options.put(WriterOptions.BULK_WRITER_CL.name(), "LOCAL_QUORUM");
+ options.put(WriterOptions.COORDINATED_WRITE_CONFIG.name(),
coordinatedWriteConfJson);
+ conf = new BulkSparkConf(sparkConf, options);
+ assertThat(conf.isCoordinatedWriteConfigured())
+ .describedAs("When COORDINATED_WRITE_CONF is present, it should return
true")
+ .isTrue();
+
assertThat(conf.coordinatedWriteConf().clusters()).containsOnlyKeys("cluster1",
"cluster2");
+ CoordinatedWriteConf.ClusterConf cluster1 =
conf.coordinatedWriteConf().cluster("cluster1");
+ assertThat(cluster1).isNotNull();
+ assertThat(cluster1.sidecarContactPoints())
+ .containsExactlyInAnyOrder(new SidecarInstanceImpl("instance-1", 9999),
+ new SidecarInstanceImpl("instance-2", 9999),
+ new SidecarInstanceImpl("instance-3",
9999));
+ assertThat(cluster1.localDc()).isEqualTo("dc1");
+ CoordinatedWriteConf.ClusterConf cluster2 =
conf.coordinatedWriteConf().cluster("cluster2");
+ assertThat(cluster2).isNotNull();
+ assertThat(cluster2.sidecarContactPoints())
+ .containsExactlyInAnyOrder(new SidecarInstanceImpl("instance-4", 8888),
+ new SidecarInstanceImpl("instance-5", 8888),
+ new SidecarInstanceImpl("instance-6",
8888));
+ assertThat(cluster2.localDc()).isEqualTo("dc1");
+ }
+
private Map<String, String> copyDefaultOptions()
{
TreeMap<String, String> map = new
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
index 5428363..59d7705 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
@@ -62,9 +62,7 @@ class BulkWriteValidatorTest
for (RingInstance instance : topology.getTokenRanges().keySet())
{
// Mark nodes 0, 1, 2 as DOWN
- int nodeId = Integer.parseInt(instance.ipAddress()
- .replace("localhost", "")
- .replace(":9042", ""));
+ int nodeId =
Integer.parseInt(instance.ipAddress().replace("localhost", ""));
instanceAvailabilityMap.put(instance, (nodeId <= 2) ?
WriteAvailability.UNAVAILABLE_DOWN : WriteAvailability.AVAILABLE);
}
when(mockClusterInfo.clusterWriteAvailability()).thenReturn(instanceAvailabilityMap);
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
index f84280f..578781b 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
@@ -494,6 +494,7 @@ class ImportCompletionCoordinatorTest
int instanceInRing = i % totalInstances + 1;
return new RingInstance(new RingEntry.Builder()
.datacenter("DC1")
+ .rack("Rack")
.address("127.0.0." + instanceInRing)
.token(String.valueOf(i * 100_000))
.fqdn("DC1-i" + instanceInRing)
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index d93a217..33b09a2 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -46,6 +46,7 @@ import
o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
import org.apache.cassandra.bridge.CassandraVersion;
+import
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
@@ -63,6 +64,7 @@ import org.apache.cassandra.spark.validation.StartupValidator;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DATE;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT;
@@ -210,6 +212,12 @@ public class MockBulkWriterContext implements
BulkWriterContext, ClusterInfo, Jo
return null;
}
+ @Override
+ public String clusterId()
+ {
+ return "test-cluster";
+ }
+
@Override
public void refreshClusterInfo()
{
@@ -303,6 +311,13 @@ public class MockBulkWriterContext implements
BulkWriterContext, ClusterInfo, Jo
return 2.0;
}
+ @Nullable
+ @Override
+ public CoordinatedWriteConf coordinatedWriteConf()
+ {
+ return null;
+ }
+
public void setSkipCleanOnFailures(boolean skipClean)
{
this.skipClean = skipClean;
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceSerializationTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceSerializationTest.java
index d15db89..333eabb 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceSerializationTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceSerializationTest.java
@@ -69,7 +69,7 @@ public class RingInstanceSerializationTest
7000,
dataCenter);
- RingInstance ring = new RingInstance(metadata);
+ RingInstance ring = new RingInstance(metadata, "test-cluster");
byte[] bytes = serialize(ring);
RingInstance deserialized = deserialize(bytes, RingInstance.class);
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
index da2f1af..0846e8f 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
@@ -31,21 +31,23 @@ import java.util.List;
import java.util.Map;
import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Range;
import org.junit.jupiter.api.Test;
import o.a.c.sidecar.client.shaded.common.response.data.RingEntry;
-import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
+import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel.CL;
import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.NotNull;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class RingInstanceTest
{
@@ -123,58 +125,75 @@ public class RingInstanceTest
@Test
public void testEquals()
{
- RingInstance instance1 = TokenRangeMappingUtils.getInstances(0,
ImmutableMap.of("DATACENTER1", 1), 1).get(0);
- RingInstance instance2 = TokenRangeMappingUtils.getInstances(0,
ImmutableMap.of("DATACENTER1", 1), 1).get(0);
+ RingInstance instance1 = mockRingInstance();
+ RingInstance instance2 = mockRingInstance();
assertEquals(instance1, instance2);
}
@Test
public void testHashCode()
{
- RingInstance instance1 = TokenRangeMappingUtils.getInstances(0,
ImmutableMap.of("DATACENTER1", 1), 1).get(0);
- RingInstance instance2 = TokenRangeMappingUtils.getInstances(0,
ImmutableMap.of("DATACENTER1", 1), 1).get(0);
+ RingInstance instance1 = mockRingInstance();
+ RingInstance instance2 = mockRingInstance();
assertEquals(instance1.hashCode(), instance2.hashCode());
}
@Test
- public void testEqualsAndHashcodeIgnoreHost()
- {
- RingInstance realInstance = new RingInstance(new RingEntry.Builder()
- .datacenter("DATACENTER1")
- .address("127.0.0.1")
- .port(7000)
- .rack("Rack")
- .status("UP")
- .state("NORMAL")
- .load("0")
- .token("0")
- .fqdn("fqdn")
- .hostId("")
- .owns("")
- .build());
-
- RingInstance questionInstance = new RingInstance(new
RingEntry.Builder()
-
.datacenter("DATACENTER1")
- .address("127.0.0.1")
- .port(7000)
- .rack("Rack")
- .status("UP")
- .state("NORMAL")
- .load("0")
- .token("0")
- .fqdn("fqdn")
- .hostId("")
- .owns("")
- .build());
- assertEquals(realInstance, questionInstance);
- assertEquals(realInstance.hashCode(), questionInstance.hashCode());
+ public void testEqualsAndHashcodeIgnoreNonCriticalFields()
+ {
+ RingEntry.Builder builder = mockRingEntryBuilder();
+ // the fields chained in the builder below are not considered for
equality check
+ RingInstance instance1 = new RingInstance(builder
+ .status("1")
+ .state("1")
+ .load("1")
+ .hostId("1")
+ .owns("1")
+ .build());
+ RingInstance instance2 = new RingInstance(builder
+ .status("2")
+ .state("2")
+ .load("2")
+ .hostId("2")
+ .owns("2")
+ .build());
+ assertEquals(instance1, instance2);
+ assertEquals(instance1.hashCode(), instance2.hashCode());
+ }
+
+ @Test
+ public void testEqualsAndHashcodeConsidersClusterId()
+ {
+ RingEntry ringEntry = mockRingEntry();
+ RingInstance c1i1 = new RingInstance(ringEntry, "cluster1");
+ RingInstance c1i2 = new RingInstance(ringEntry, "cluster1");
+ RingInstance c2i1 = new RingInstance(ringEntry, "cluster2");
+
+ assertEquals(c1i1, c1i2);
+ assertEquals(c1i1.hashCode(), c1i2.hashCode());
+
+ assertNotEquals(c1i1, c2i1);
+ assertNotEquals(c1i1.hashCode(), c2i1.hashCode());
+ }
+
+ @Test
+ public void testHasClusterId()
+ {
+ RingEntry ringEntry = mockRingEntry();
+ RingInstance instance = new RingInstance(ringEntry);
+ assertFalse(instance.hasClusterId());
+
+ RingInstance instanceWithClusterId = new RingInstance(ringEntry,
"cluster1");
+ assertTrue(instanceWithClusterId.hasClusterId());
+ assertEquals("cluster1", instanceWithClusterId.clusterId());
}
@Test
public void multiMapWorksWithRingInstances()
{
- RingInstance instance1 = TokenRangeMappingUtils.getInstances(0,
ImmutableMap.of("DATACENTER1", 1), 1).get(0);
- RingInstance instance2 = TokenRangeMappingUtils.getInstances(0,
ImmutableMap.of("DATACENTER1", 1), 1).get(0);
+ RingEntry ringEntry = mockRingEntry();
+ RingInstance instance1 = new RingInstance(ringEntry);
+ RingInstance instance2 = new RingInstance(ringEntry);
byte[] buffer;
try
@@ -224,7 +243,35 @@ public class RingInstanceTest
replicationFactor3.addFailure(Range.openClosed(tokens[0].add(BigInteger.ONE),
tokens[0].add(BigInteger.valueOf(2L))), instance2, "Failure 2");
- replicationFactor3.getFailedRanges(tokenRange,
ConsistencyLevel.CL.LOCAL_QUORUM, DATACENTER_1);
- assertFalse(replicationFactor3.hasFailed(tokenRange,
ConsistencyLevel.CL.LOCAL_QUORUM, DATACENTER_1));
+ assertTrue(replicationFactor3.getFailedRanges(tokenRange,
CL.LOCAL_QUORUM, DATACENTER_1).isEmpty());
+ }
+
+ @NotNull
+ private static RingEntry mockRingEntry()
+ {
+ return mockRingEntryBuilder().build();
+ }
+
+ @NotNull
+ private static RingEntry.Builder mockRingEntryBuilder()
+ {
+ return new RingEntry.Builder()
+ .datacenter("DATACENTER1")
+ .address("127.0.0.1")
+ .port(0)
+ .rack("Rack")
+ .status("UP")
+ .state("NORMAL")
+ .load("0")
+ .token("0")
+ .fqdn("DATACENTER1-i1")
+ .hostId("")
+ .owns("");
+ }
+
+ @NotNull
+ private static RingInstance mockRingInstance()
+ {
+ return new RingInstance(mockRingEntry());
}
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
index 9c53279..202d3a3 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
@@ -61,7 +61,7 @@ public final class TokenRangeMappingUtils
{
List<RingInstance> instances = getInstances(initialToken, rfByDC,
instancesPerDC);
RingInstance instance = instances.remove(0);
- RingEntry entry = instance.ringInstance();
+ RingEntry entry = instance.ringEntry();
RingEntry newEntry = new RingEntry.Builder()
.datacenter(entry.datacenter())
.port(entry.port())
@@ -97,7 +97,7 @@ public final class TokenRangeMappingUtils
if (shouldUpdateToken)
{
RingInstance instance = instances.remove(0);
- RingEntry entry = instance.ringInstance();
+ RingEntry entry = instance.ringEntry();
RingEntry newEntry = new RingEntry.Builder()
.datacenter(entry.datacenter())
.port(entry.port())
@@ -218,9 +218,11 @@ public final class TokenRangeMappingUtils
replicasPerDc.put("ignored", replicas);
ReplicaInfo ri = new ReplicaInfo(String.valueOf(startToken),
String.valueOf(endToken), replicasPerDc);
replicaInfoList.add(ri);
- String address = "localhost" + i + ":9042";
+ String address = "localhost" + i;
+ int port = 9042;
+ String addressWithPort = address + ":" + port;
ReplicaMetadata rm = new ReplicaMetadata("NORMAL", "UP", address,
address, 9042, "ignored");
- replicaMetadata.put(address, rm);
+ replicaMetadata.put(addressWithPort, rm);
startToken = endToken;
}
return new TokenRangeReplicasResponse(replicaInfoList,
replicaInfoList, replicaMetadata);
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConfTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConfTest.java
new file mode 100644
index 0000000..be15a6a
--- /dev/null
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConfTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.cassandra.sidecar.client.SidecarInstance;
+import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
+import
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.ClusterConf;
+import
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.SimpleClusterConf;
+import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel.CL;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class CoordinatedWriteConfTest
+{
+ @Test
+ void testSerDeser() throws JsonProcessingException
+ {
+ Map<String, SimpleClusterConf> clusters = new HashMap<>();
+ clusters.put("cluster1", new
SimpleClusterConf(Arrays.asList("instance-1:9999", "instance-2:9999",
"instance-3:9999"), "dc1"));
+ clusters.put("cluster2", new
SimpleClusterConf(Arrays.asList("instance-4:8888", "instance-5:8888",
"instance-6:8888"), "dc1"));
+ CoordinatedWriteConf conf = new CoordinatedWriteConf(clusters);
+ String json = conf.toJson();
+ assertThat(json)
+ .isEqualTo("{\"cluster1\":" +
+
"{\"sidecarContactPoints\":[\"instance-1:9999\",\"instance-2:9999\",\"instance-3:9999\"],"
+
+ "\"localDc\":\"dc1\"}," +
+ "\"cluster2\":" +
+
"{\"sidecarContactPoints\":[\"instance-4:8888\",\"instance-5:8888\",\"instance-6:8888\"],"
+
+ "\"localDc\":\"dc1\"}}");
+ CoordinatedWriteConf deser = CoordinatedWriteConf.create(json,
CL.LOCAL_QUORUM, SimpleClusterConf.class);
+ assertThat(deser.clusters()).containsKeys("cluster1", "cluster2");
+
assertThat(deser.clustersOf(SimpleClusterConf.class).get("cluster1").sidecarContactPointsValue())
+ .isEqualTo(Arrays.asList("instance-1:9999", "instance-2:9999",
"instance-3:9999"));
+
assertThat(deser.clustersOf(SimpleClusterConf.class).get("cluster2").sidecarContactPointsValue())
+ .isEqualTo(Arrays.asList("instance-4:8888", "instance-5:8888",
"instance-6:8888"));
+
assertThat(deser.clusters().get("cluster1").localDc()).isEqualTo("dc1");
+
assertThat(deser.clusters().get("cluster2").localDc()).isEqualTo("dc1");
+ Set<SidecarInstance> contactPoints =
deser.clusters().get("cluster1").sidecarContactPoints();
+ assertThat(contactPoints)
+ .hasSize(3);
+ // assertj contains method does not compile with SidecarInstanceImpl
due to type erasure
+ assertThat(contactPoints.contains(new
SidecarInstanceImpl("instance-1", 9999))).isTrue();
+ assertThat(contactPoints.contains(new
SidecarInstanceImpl("instance-2", 9999))).isTrue();
+ assertThat(contactPoints.contains(new
SidecarInstanceImpl("instance-3", 9999))).isTrue();
+ contactPoints =
deser.clusters().get("cluster2").sidecarContactPoints();
+ assertThat(contactPoints)
+ .hasSize(3);
+ assertThat(contactPoints.contains(new
SidecarInstanceImpl("instance-4", 8888))).isTrue();
+ assertThat(contactPoints.contains(new
SidecarInstanceImpl("instance-5", 8888))).isTrue();
+ assertThat(contactPoints.contains(new
SidecarInstanceImpl("instance-6", 8888))).isTrue();
+ }
+
+ @Test
+ void testDeserFailsWhenInstanceHasNoPort()
+ {
+ String json = "{\"cluster1\":" +
+
"{\"sidecarContactPoints\":[\"instance-1\",\"instance-2\",\"instance-3\"]," +
+ "\"localDc\":\"dc1\"}}";
+ assertThatThrownBy(() -> CoordinatedWriteConf.create(json,
CL.LOCAL_QUORUM, SimpleClusterConf.class))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unable to parse json string into
CoordinatedWriteConf of SimpleClusterConf")
+ .hasRootCauseExactlyInstanceOf(IllegalStateException.class)
+ .hasRootCauseMessage("Unable to resolve port from instance-1");
+ }
+
+ @Test
+ void testDeserFailsDueToMissingLocalDcWithNonLocalCL()
+ {
+ String json =
"{\"cluster1\":{\"sidecarContactPoints\":[\"instance-1:8888\"]}}";
+ assertThatThrownBy(() -> CoordinatedWriteConf.create(json,
CL.LOCAL_QUORUM, SimpleClusterConf.class))
+ .isExactlyInstanceOf(IllegalStateException.class)
+ .hasMessage("localDc is not configured for cluster: cluster1 for
consistency level: LOCAL_QUORUM");
+ }
+
+ @Test
+ void testResolveLocalDc()
+ {
+ ClusterConf clusterWithLocalDc = new
SimpleClusterConf(Collections.singletonList("instance-1:9999"), "dc1");
+ assertThat(clusterWithLocalDc.resolveLocalDc(CL.EACH_QUORUM))
+ .describedAs("Resolving localDc with Non-local CL should return null")
+ .isNull();
+ assertThat(clusterWithLocalDc.resolveLocalDc(CL.LOCAL_QUORUM))
+ .describedAs("Resolving localDc with local CL should return the actual
localDc")
+ .isEqualTo("dc1");
+ ClusterConf clusterWithoutLocalDc = new
SimpleClusterConf(Collections.singletonList("instance-1:9999"), null);
+
assertThat(clusterWithoutLocalDc.resolveLocalDc(CL.EACH_QUORUM)).isNull();
+ assertThatThrownBy(() ->
clusterWithoutLocalDc.resolveLocalDc(CL.LOCAL_QUORUM))
+ .isExactlyInstanceOf(IllegalStateException.class)
+ .hasMessage("No localDc is specified for local consistency level: " +
CL.LOCAL_QUORUM);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]