aweisberg commented on code in PR #3842:
URL: https://github.com/apache/cassandra/pull/3842#discussion_r1932793214
##########
src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java:
##########
@@ -407,7 +407,11 @@ void maybeReportMetadata(ClusterMetadata metadata)
long epoch = metadata.epoch.getEpoch();
synchronized (epochs)
{
- if (epochs.maxEpoch() == 0)
+ long maxEpoch = epochs.maxEpoch();
+ if (maxEpoch >= epoch)
Review Comment:
Does Accord knowing about the epoch guarantee that TCM has already loaded
it? We don't want to skip the TCM loading step by not indirectly calling
`fetchTopologyInternal` or something to ensure TCM loaded it.
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -410,48 +413,67 @@ public synchronized void startup()
}
/**
- * Queries peers to discover min epoch
+ * Queries peers to discover min epoch, and then fetches all topologies
between min and current epochs
*/
- private long fetchMinEpoch()
+ private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata
metadata) throws ExecutionException, InterruptedException
{
- ClusterMetadata metadata = ClusterMetadata.current();
- Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>();
- for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
+ if (configService.maxEpoch() >= metadata.epoch.getEpoch())
{
- List<TableMetadata> tables =
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
- if (tables.isEmpty())
- continue;
- DataPlacement current =
metadata.placements.get(keyspace.params.replication);
- DataPlacement settled =
metadata.writePlacementAllSettled(keyspace);
- Sets.SetView<InetAddressAndPort> alive =
Sets.intersection(settled.writes.byEndpoint().keySet(),
current.writes.byEndpoint().keySet());
- InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
- settled.writes.forEach((range, group) -> {
- if (group.endpoints().contains(self))
- {
- for (InetAddressAndPort peer : group.endpoints())
- {
- if (peer.equals(self) || !alive.contains(peer))
continue;
- for (TableMetadata table : tables)
- peers.computeIfAbsent(peer, i -> new
HashSet<>()).add(AccordTopology.fullRange(table.id));
- }
- }
- });
+ logger.info("Accord epoch {} matches TCM. All topologies are known
locally", metadata.epoch);
+ return
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
}
+
+ Set<InetAddressAndPort> peers = new HashSet<>();
+ peers.addAll(metadata.directory.allAddresses());
+ peers.remove(FBUtilities.getBroadcastAddressAndPort());
+
+ // No peers: single node cluster or first node to boot
if (peers.isEmpty())
- return -1;
+ return
Collections.singletonList(AccordTopology.createAccordTopology(metadata));;
+
+ // Bootstrap, fetch min epoch
+ if (minEpoch == 0)
+ {
+ long fetched = findMinEpoch(SharedContext.Global.instance, peers);
+ // No other node has advanced epoch just yet
+ if (fetched == 0)
+ return
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
+
+ minEpoch = fetched;
+ }
+
+ long maxEpoch = metadata.epoch.getEpoch();
+
+ // If we are behind minEpoch, catch up to at least minEpoch
+ if (metadata.epoch.getEpoch() < minEpoch)
+ {
+ minEpoch = metadata.epoch.getEpoch();
+ maxEpoch = minEpoch;
+ }
+
+ List<Future<Topology>> futures = new ArrayList<>();
+ logger.info("Discovered min epoch of {}. Proceeding to fetch epochs up
to {}.", minEpoch, maxEpoch);
+
+ for (long epoch = minEpoch; epoch <= maxEpoch; epoch++)
+ futures.add(FetchTopology.fetch(SharedContext.Global.instance,
peers, epoch));
+
+ FBUtilities.waitOnFutures(futures);
+ List<Topology> topologies = new ArrayList<>(futures.size());
+ for (Future<Topology> future : futures)
+ topologies.add(future.get());
- Long minEpoch = findMinEpoch(SharedContext.Global.instance, peers);
- if (minEpoch == null)
- return -1;
- return minEpoch;
+ return topologies;
}
@VisibleForTesting
- static Long findMinEpoch(SharedContext context, Map<InetAddressAndPort,
Set<TokenRange>> peers)
+ static long findMinEpoch(SharedContext context, Set<InetAddressAndPort>
peers)
{
try
{
- return FetchMinEpoch.fetch(context, peers).get();
+ Long result = FetchMinEpoch.fetch(context, peers).get();
+ if (result == null)
+ return 0L;
+ return result.longValue();
Review Comment:
Won't this unbox automatically?
##########
src/java/org/apache/cassandra/service/accord/FetchTopology.java:
##########
@@ -123,18 +116,20 @@ public Response(long epoch, Topology topology)
long epoch = message.payload.epoch;
Topology topology =
AccordService.instance().topology().maybeGlobalForEpoch(epoch);
if (topology == null)
- MessagingService.instance().respond(Response.UNKNOWN, message);
+ MessagingService.instance().respond(Response.unkonwn(epoch),
message);
else
MessagingService.instance().respond(new Response(epoch, topology),
message);
};
+ private static final Logger logger =
LoggerFactory.getLogger(FetchTopology.class);
Review Comment:
`logger` goes at the top? Also unused.
##########
src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java:
##########
@@ -145,21 +108,22 @@ public static Future<Long> fetch(SharedContext context,
Map<InetAddressAndPort,
}
@VisibleForTesting
- static Future<Long> fetch(SharedContext context, InetAddressAndPort to,
Set<TokenRange> value)
+ static Future<Long> fetch(SharedContext context, InetAddressAndPort to)
{
- FetchMinEpoch req = new FetchMinEpoch(value);
- return context.messaging().<FetchMinEpoch,
FetchMinEpoch.Response>sendWithRetries(Backoff.NO_OP.INSTANCE,
-
MessageDelivery.ImmediateRetryScheduler.instance,
-
Verb.ACCORD_FETCH_MIN_EPOCH_REQ, req,
+ Backoff backoff = Backoff.fromConfig(context,
DatabaseDescriptor.getAccord().minEpochSyncRetry);
+ return context.messaging().<FetchMinEpoch,
FetchMinEpoch.Response>sendWithRetries(backoff,
+
context.optionalTasks()::schedule,
+
Verb.ACCORD_FETCH_MIN_EPOCH_REQ,
+
FetchMinEpoch.instance,
Iterators.cycle(to),
-
RetryPredicate.times(DatabaseDescriptor.getAccord().minEpochSyncRetry.maxAttempts.value),
+
RetryPredicate.ALWAYS_RETRY,
Review Comment:
Does this mean that fetching the min epoch requires all nodes to be up to
complete? Just looking at how this is accumulated by the caller of `fetch`
which combines all the futures and can't complete until every future completes
which means any down node would stop this from working?
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -377,24 +371,33 @@ public synchronized void startup()
node.commandStores().restoreShardStateUnsafe(topology ->
configService.reportTopology(topology, true, true));
configService.start();
- long minEpoch = fetchMinEpoch();
- if (minEpoch >= 0)
+ try
{
- for (long epoch = minEpoch; epoch <= metadata.epoch.getEpoch();
epoch++)
- node.configService().fetchTopologyForEpoch(epoch);
+ // Fetch topologies up to current
+ List<Topology> topologies = fetchTopologies(0, metadata);
Review Comment:
Maybe make 0 a constant indicating that it is actually supposed to find the
`minEpoch` to fetch?
##########
src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java:
##########
@@ -420,14 +424,17 @@ void maybeReportMetadata(ClusterMetadata metadata)
@Override
protected void fetchTopologyInternal(long epoch)
{
+ if (ClusterMetadata.current().epoch.getEpoch() < epoch)
+
ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(epoch));
+
try
{
Set<InetAddressAndPort> peers = new
HashSet<>(ClusterMetadata.current().directory.allJoinedEndpoints());
peers.remove(FBUtilities.getBroadcastAddressAndPort());
if (peers.isEmpty())
return;
Topology topology;
- while ((topology
=FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get()) ==
null) {}
+ while ((topology =
FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get()) ==
null) {}
Review Comment:
Why do we need to run `FetchTopology` here if we would indirectly get the
topology from `
ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(epoch));`
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -410,48 +413,67 @@ public synchronized void startup()
}
/**
- * Queries peers to discover min epoch
+ * Queries peers to discover min epoch, and then fetches all topologies
between min and current epochs
*/
- private long fetchMinEpoch()
+ private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata
metadata) throws ExecutionException, InterruptedException
{
- ClusterMetadata metadata = ClusterMetadata.current();
- Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>();
- for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
+ if (configService.maxEpoch() >= metadata.epoch.getEpoch())
{
- List<TableMetadata> tables =
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
- if (tables.isEmpty())
- continue;
- DataPlacement current =
metadata.placements.get(keyspace.params.replication);
- DataPlacement settled =
metadata.writePlacementAllSettled(keyspace);
- Sets.SetView<InetAddressAndPort> alive =
Sets.intersection(settled.writes.byEndpoint().keySet(),
current.writes.byEndpoint().keySet());
- InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
- settled.writes.forEach((range, group) -> {
- if (group.endpoints().contains(self))
- {
- for (InetAddressAndPort peer : group.endpoints())
- {
- if (peer.equals(self) || !alive.contains(peer))
continue;
- for (TableMetadata table : tables)
- peers.computeIfAbsent(peer, i -> new
HashSet<>()).add(AccordTopology.fullRange(table.id));
- }
- }
- });
+ logger.info("Accord epoch {} matches TCM. All topologies are known
locally", metadata.epoch);
Review Comment:
I don't know why this is true so I am just going to have to take it on faith.
##########
src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java:
##########
@@ -420,14 +424,17 @@ void maybeReportMetadata(ClusterMetadata metadata)
@Override
protected void fetchTopologyInternal(long epoch)
{
+ if (ClusterMetadata.current().epoch.getEpoch() < epoch)
Review Comment:
Had to put this on the migration stage on my branch
##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordDropKeyspaceTest.java:
##########
@@ -35,8 +34,7 @@ public void dropKeyspace() throws IOException
int steps = 5;
try (Cluster cluster = Cluster.build(3)
.withoutVNodes()
- .withConfig(c -> c.with(Feature.values())
Review Comment:
When I merged I set it explicitly to `GOSSIP, NETWORK, NATIVE_PROTOCOL` but
we should probably go back to `values` if the tests can run fine now that we
know that `BLANK_GOSSIP` is ignored.
##########
src/java/org/apache/cassandra/service/accord/FetchTopology.java:
##########
@@ -123,18 +116,20 @@ public Response(long epoch, Topology topology)
long epoch = message.payload.epoch;
Topology topology =
AccordService.instance().topology().maybeGlobalForEpoch(epoch);
if (topology == null)
- MessagingService.instance().respond(Response.UNKNOWN, message);
+ MessagingService.instance().respond(Response.unkonwn(epoch),
message);
Review Comment:
Is this for test code that can catch the actual exception?
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -410,48 +413,67 @@ public synchronized void startup()
}
/**
- * Queries peers to discover min epoch
+ * Queries peers to discover min epoch, and then fetches all topologies
between min and current epochs
*/
- private long fetchMinEpoch()
+ private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata
metadata) throws ExecutionException, InterruptedException
{
- ClusterMetadata metadata = ClusterMetadata.current();
- Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>();
- for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
+ if (configService.maxEpoch() >= metadata.epoch.getEpoch())
{
- List<TableMetadata> tables =
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
- if (tables.isEmpty())
- continue;
- DataPlacement current =
metadata.placements.get(keyspace.params.replication);
- DataPlacement settled =
metadata.writePlacementAllSettled(keyspace);
- Sets.SetView<InetAddressAndPort> alive =
Sets.intersection(settled.writes.byEndpoint().keySet(),
current.writes.byEndpoint().keySet());
- InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
- settled.writes.forEach((range, group) -> {
- if (group.endpoints().contains(self))
- {
- for (InetAddressAndPort peer : group.endpoints())
- {
- if (peer.equals(self) || !alive.contains(peer))
continue;
- for (TableMetadata table : tables)
- peers.computeIfAbsent(peer, i -> new
HashSet<>()).add(AccordTopology.fullRange(table.id));
- }
- }
- });
+ logger.info("Accord epoch {} matches TCM. All topologies are known
locally", metadata.epoch);
+ return
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
}
+
+ Set<InetAddressAndPort> peers = new HashSet<>();
+ peers.addAll(metadata.directory.allAddresses());
+ peers.remove(FBUtilities.getBroadcastAddressAndPort());
+
+ // No peers: single node cluster or first node to boot
if (peers.isEmpty())
- return -1;
+ return
Collections.singletonList(AccordTopology.createAccordTopology(metadata));;
+
+ // Bootstrap, fetch min epoch
+ if (minEpoch == 0)
Review Comment:
OK, that makes 0 sneaky. I guess 0 is the empty epoch so we never need to
fetch it? It might be better/clearer to use a boxed Long and null or a constant.
##########
test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java:
##########
@@ -271,21 +223,21 @@ private static MessageDelivery.FailedResponseException
getFailedResponseExceptio
}
catch (ExecutionException e)
{
- if (e.getCause() instanceof
MessageDelivery.FailedResponseException)
+ if (e.getCause() instanceof MessageDelivery.MaxRetriesException)
{
- exception = (MessageDelivery.FailedResponseException)
e.getCause();
+ maxRetries = (MessageDelivery.MaxRetriesException)
e.getCause();
}
else
{
throw e;
}
}
- return exception;
+ return maxRetries;
}
- private static MessageDelivery.MaxRetriesException
getMaxRetriesException(Future<Long> f) throws InterruptedException,
ExecutionException
+ private static MessageDelivery.FailedResponseException
getFailedResponseException(Future<Long> f) throws InterruptedException,
ExecutionException
Review Comment:
This method is unused
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -410,48 +413,67 @@ public synchronized void startup()
}
/**
- * Queries peers to discover min epoch
+ * Queries peers to discover min epoch, and then fetches all topologies
between min and current epochs
*/
- private long fetchMinEpoch()
+ private List<Topology> fetchTopologies(long minEpoch, ClusterMetadata
metadata) throws ExecutionException, InterruptedException
{
- ClusterMetadata metadata = ClusterMetadata.current();
- Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>();
- for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
+ if (configService.maxEpoch() >= metadata.epoch.getEpoch())
{
- List<TableMetadata> tables =
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
- if (tables.isEmpty())
- continue;
- DataPlacement current =
metadata.placements.get(keyspace.params.replication);
- DataPlacement settled =
metadata.writePlacementAllSettled(keyspace);
- Sets.SetView<InetAddressAndPort> alive =
Sets.intersection(settled.writes.byEndpoint().keySet(),
current.writes.byEndpoint().keySet());
- InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
- settled.writes.forEach((range, group) -> {
- if (group.endpoints().contains(self))
- {
- for (InetAddressAndPort peer : group.endpoints())
- {
- if (peer.equals(self) || !alive.contains(peer))
continue;
- for (TableMetadata table : tables)
- peers.computeIfAbsent(peer, i -> new
HashSet<>()).add(AccordTopology.fullRange(table.id));
- }
- }
- });
+ logger.info("Accord epoch {} matches TCM. All topologies are known
locally", metadata.epoch);
+ return
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
}
+
+ Set<InetAddressAndPort> peers = new HashSet<>();
+ peers.addAll(metadata.directory.allAddresses());
+ peers.remove(FBUtilities.getBroadcastAddressAndPort());
+
+ // No peers: single node cluster or first node to boot
if (peers.isEmpty())
- return -1;
+ return
Collections.singletonList(AccordTopology.createAccordTopology(metadata));;
+
+ // Bootstrap, fetch min epoch
+ if (minEpoch == 0)
+ {
+ long fetched = findMinEpoch(SharedContext.Global.instance, peers);
+ // No other node has advanced epoch just yet
+ if (fetched == 0)
+ return
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
+
+ minEpoch = fetched;
+ }
+
+ long maxEpoch = metadata.epoch.getEpoch();
+
+ // If we are behind minEpoch, catch up to at least minEpoch
+ if (metadata.epoch.getEpoch() < minEpoch)
+ {
+ minEpoch = metadata.epoch.getEpoch();
+ maxEpoch = minEpoch;
+ }
+
+ List<Future<Topology>> futures = new ArrayList<>();
+ logger.info("Discovered min epoch of {}. Proceeding to fetch epochs up
to {}.", minEpoch, maxEpoch);
+
+ for (long epoch = minEpoch; epoch <= maxEpoch; epoch++)
+ futures.add(FetchTopology.fetch(SharedContext.Global.instance,
peers, epoch));
Review Comment:
This is going to be kind of a flood if you have to collect a lot of these.
Maybe not an issue if it's just one node.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]