ifesdjeen commented on code in PR #4244:
URL: https://github.com/apache/cassandra/pull/4244#discussion_r2201737063
##########
src/java/org/apache/cassandra/service/accord/WatermarkCollector.java:
##########
@@ -139,32 +149,49 @@ static void
fetchAndReportWatermarksAsync(AccordConfigurationService configServi
Snapshot snapshot = m.payload;
long minEpoch = configService.minEpoch();
- for (Map.Entry<Range, Long> e : snapshot.closed.entrySet())
- {
- Ranges r = Ranges.of(e.getKey());
- configService.receiveClosed(r, e.getValue());
- }
- for (Map.Entry<Range, Long> e : snapshot.retired.entrySet())
- {
- Ranges r = Ranges.of(e.getKey());
- configService.receiveRetired(r, e.getValue());
- }
- for (Map.Entry<Integer, Long> e :
snapshot.synced.entrySet())
+ snapshot.retired.sort(sortByEpochThenRange);
Review Comment:
This one will be sorted in forEachEpoch just below
##########
src/java/org/apache/cassandra/service/accord/WatermarkCollector.java:
##########
@@ -139,32 +149,49 @@ static void
fetchAndReportWatermarksAsync(AccordConfigurationService configServi
Snapshot snapshot = m.payload;
long minEpoch = configService.minEpoch();
- for (Map.Entry<Range, Long> e : snapshot.closed.entrySet())
- {
- Ranges r = Ranges.of(e.getKey());
- configService.receiveClosed(r, e.getValue());
- }
- for (Map.Entry<Range, Long> e : snapshot.retired.entrySet())
- {
- Ranges r = Ranges.of(e.getKey());
- configService.receiveRetired(r, e.getValue());
- }
- for (Map.Entry<Integer, Long> e :
snapshot.synced.entrySet())
+ snapshot.retired.sort(sortByEpochThenRange);
+
+ forEachEpoch(configService::receiveClosed, snapshot.closed);
+ forEachEpoch(configService::receiveRetired,
snapshot.retired);
+ for (Map.Entry<Long, Long> e : snapshot.synced.entrySet())
{
- Node.Id node = new Node.Id(e.getKey());
+ Node.Id node = new
Node.Id(Ints.saturatedCast(e.getKey()));
for (long epoch = minEpoch; epoch <= e.getValue();
epoch++)
configService.receiveRemoteSyncComplete(node,
epoch);
}
});
}
+ private static void forEachEpoch(BiConsumer<Ranges, Long> forEachEpoch,
List<Map.Entry<Range, Long>> rangesAndEpochs)
+ {
+ if (rangesAndEpochs.isEmpty())
+ return;
+
+ rangesAndEpochs.sort(sortByEpochThenRange);
+ long collectingEpoch = rangesAndEpochs.get(0).getValue();
+ List<Range> ranges = new ArrayList<>();
+ for (Map.Entry<Range, Long> e : rangesAndEpochs)
+ {
+ Range range = e.getKey();
+ long epoch = e.getValue();
+ if (epoch != collectingEpoch)
+ {
+ forEachEpoch.accept(Ranges.of(ranges.toArray(Range[]::new)),
collectingEpoch);
+ collectingEpoch = epoch;
+ ranges.clear();
+ }
Review Comment:
Discussed on Slack, should we do
```
if (epoch != collectingEpoch)
{
forEachEpoch.accept(Ranges.of(ranges.toArray(Range[]::new)),
collectingEpoch);
collectingEpoch = epoch;
ranges.clear();
}
ranges.add(range);
```
##########
src/java/org/apache/cassandra/service/accord/WatermarkCollector.java:
##########
@@ -81,16 +89,15 @@ public class WatermarkCollector implements
ConfigurationService.Listener
@Override
public void onRemoteSyncComplete(Node.Id node, long epoch)
{
- synced.compute(node.id, (k, prev) -> prev == null ? epoch :
Long.max(prev, epoch));
+ synced.compute(node.id, (k, prev) -> prev == -1 ? epoch :
Long.max(prev, epoch));
Review Comment:
only realizing it now, but we may want to have a synchronized block around
this one, or?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]