aweisberg commented on code in PR #4220:
URL: https://github.com/apache/cassandra/pull/4220#discussion_r2210827616
##########
src/java/org/apache/cassandra/batchlog/BatchlogManager.java:
##########
@@ -431,7 +431,7 @@ public boolean replay(RateLimiter rateLimiter, Set<UUID>
hintedNodes) throws IOE
if (accordMutations != null)
{
accordTxnStart =
accordTxnStart.withStartedAt(Clock.Global.nanoTime());
- accordResult = accordMutations != null ?
mutateWithAccordAsync(cm, accordMutations, null, accordTxnStart) : null;
+ accordResult = accordMutations != null ?
mutateWithAccordAsync(cm, accordMutations, null, accordTxnStart, true) : null;
Review Comment:
Can you make preserve timestamps an enum so that we can then document the
enum values so it will be easy to discover anywhere we use the enum what it's
all about?
##########
src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java:
##########
@@ -86,23 +88,26 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
public static final TxnWrite EMPTY_CONDITION_FAILED = new
TxnWrite(TableMetadatas.none(), Collections.emptyList(), false);
private static final long EMPTY_SIZE =
ObjectSizes.measure(EMPTY_CONDITION_FAILED);
+ private static final int FLAG_USER_TIMESTAMP = 0x01;
public static class Update extends
AbstractParameterisedVersionedSerialized<PartitionUpdate, TableMetadatas>
{
- private static final long EMPTY_SIZE = ObjectSizes.measure(new
Update(null, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER));
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new
Update(null, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, false));
public final PartitionKey key;
public final int index;
+ public final boolean userTimestamp;
- public Update(PartitionKey key, int index, PartitionUpdate update,
TableMetadatas tables)
+ public Update(PartitionKey key, int index, PartitionUpdate update,
TableMetadatas tables, boolean userTimestamp)
Review Comment:
I don't think this is the right granularity for `userTimestamp` because it's
a whole partition update which is multiple rows some of which might preserve
timestamp and some of which might not.
We already capture what to do with timestamps with `preserveTimestamps` for
the whole partition update so it's redundant to add `userTimestamp`?
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -1248,20 +1249,22 @@ public static void mutateWithTriggers(List<? extends
IMutation> mutations,
if (augmented != null || mutateAtomically || updatesView)
mutateAtomically(augmented != null ? augmented :
(List<Mutation>)mutations, consistencyLevel, updatesView, requestTime);
else
- dispatchMutationsWithRetryOnDifferentSystem(mutations,
consistencyLevel, requestTime);
+ dispatchMutationsWithRetryOnDifferentSystem(mutations,
consistencyLevel, requestTime, preserveTimestamps);
}
- public static void dispatchMutationsWithRetryOnDifferentSystem(List<?
extends IMutation> mutations, ConsistencyLevel consistencyLevel,
Dispatcher.RequestTime requestTime)
+ public static void dispatchMutationsWithRetryOnDifferentSystem(List<?
extends IMutation> mutations, ConsistencyLevel consistencyLevel,
Dispatcher.RequestTime requestTime, boolean preserveTimestamps)
{
while (true)
{
ClusterMetadata cm = ClusterMetadata.current();
try
{
- SplitMutations splitMutations =
splitMutationsIntoAccordAndNormal(cm, (List<IMutation>)mutations);
+ SplitMutations<?> splitMutations =
splitMutationsIntoAccordAndNormal(cm, (List<IMutation>)mutations);
List<? extends IMutation> accordMutations =
splitMutations.accordMutations();
- IAccordResult<TxnResult> accordResult = accordMutations !=
null ? mutateWithAccordAsync(cm, accordMutations, consistencyLevel,
requestTime) : null;
List<? extends IMutation> normalMutations =
splitMutations.normalMutations();
+ if (!preserveTimestamps && normalMutations != null)
Review Comment:
Add a comment explaining that if there was ever any attempt to apply part of
the mutation using the eventually consistent path then we need to continue to
use the timestamp used by the eventually consistent path to not end up with
multiple timestamps, but if it only ever used the transactional path then we
can use the transactional timestamp to get linearizability
##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -86,6 +87,7 @@ public class Mutation implements IMutation, Supplier<Mutation>
volatile long viewLockAcquireStart;
private final boolean cdcEnabled;
+ private final boolean userTimestamp;
Review Comment:
I don't think this flag makes sense or has utility? Only the coordinator
needs to make the call about whether to preserve the timestamp. It doesn't need
to be serialized and carried with the mutation.
Batchlog and hints always preserve mutations so they don't need it
serialized.
The granularity isn't enough to do anything useful at the coordinator and
`mutateWithTriggers` already has this information passed in as a parameter. I
really don't think it's worth the noise in `Mutation` if it's never going to be
usable for anything.
##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -552,17 +570,18 @@ private Serialization serialization(Mutation mutation,
int version)
}
static void
serializeInternal(PartitionUpdate.PartitionUpdateSerializer serializer,
- Mutation mutation,
- DataOutputPlus out,
- int version) throws IOException
+ Mutation mutation,
+ DataOutputPlus out,
+ int version) throws IOException
{
Map<TableId, PartitionUpdate> modifications =
mutation.modifications;
if (version >= VERSION_51)
{
int flags = 0;
flags |=
potentialTxnConflictsFlag(mutation.potentialTxnConflicts);
- out.write(flags);
+ flags |= userTimestampFlag(mutation.userTimestamp);
+ out.writeUnsignedVInt32(flags);
Review Comment:
Using a vint is fine, but `write` with a 32-bit int writes 1 byte not 4.
With versioning of serialization we don't really need to burn instructions on a
vint unless we want to add flags without bumping the version?
##########
src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java:
##########
@@ -262,17 +266,18 @@ public static IAccordResult<TxnResult>
mutateWithAccordAsync(ClusterMetadata cm,
long minEpoch = Epoch.EMPTY.getEpoch();
for (IMutation mutation : mutations)
{
+ boolean userTimestamp = mutation instanceof Mutation &&
((Mutation) mutation).isUserTimestamp();
Review Comment:
Again, this isn't a useful granularity. In practice mutation is not a
grouping upon which `userTimestamp` is set its per row?
We don't need this intermediate step that only works with some CQL but not
others and adds a lot of noise and needs to be replaced anyways.
Just the flag to `mutateWithTriggers` is enough to either error out on user
timestamps, or preserve them. Partially linearizable batches is just wonky and
not really worth what this amount of code achieve that needs to be replaced
anyways?
##########
src/java/org/apache/cassandra/cql3/statements/BatchStatement.java:
##########
@@ -357,7 +357,7 @@ public List<? extends IMutation> getMutations(ClientState
state,
}
// local is either executeWithoutConditions modifying a virtual table
(doesn't support txns) or executeLocal
// which is called by test or internal things that are bypassing
distributed system modification/checks
- return collector.toMutations(state, local ?
PotentialTxnConflicts.ALLOW : PotentialTxnConflicts.DISALLOW);
+ return collector.toMutations(state, local ?
PotentialTxnConflicts.ALLOW : PotentialTxnConflicts.DISALLOW,
attrs.isTimestampSet());
Review Comment:
I don't think it will ever be that useful? We pass in whether to preserve
timestamps to `mutateWithTriggers` and I think that is enough. We don't need to
serialize it with the mutation because it's only needed for batch log and hints
where it is flipped to true anyways by necessity.
Anything that fully addresses it will need a per statement granularity
preserving timestamp.
Anything that partially addresses it can use the boolean passed
`mutateWithTriggers` to signal an error in cases where the timestamp wouldn't
be preserved.
##########
test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java:
##########
@@ -206,10 +211,55 @@ protected <S extends BaseState>
Property.StatefulSuccess<S, Void> onSuccess(Logg
});
}
+ protected static <S extends CommonState> Property.Command<S, Void, ?>
validateUsingTimestamp(RandomSource rs, S state)
+ {
+ if (state.operations == 0)
+ return ignoreCommand();
+ var builder = Select.builder(state.metadata);
+ for (var c : state.model.factory.regularAndStaticColumns)
+ builder.selection(FunctionCall.writetime(c));
+ ByteBuffer upperboundTimestamp = LongType.instance.decompose((long)
state.operations);
+ var select = builder.build();
+ var inst = state.selectInstance(rs);
+ return new Property.SimpleCommand<>(state.humanReadable(select, null),
s -> {
+ var result = s.executeQuery(inst, Integer.MAX_VALUE, s.selectCl(),
select);
+ for (var row : result)
+ {
+ for (var col : state.model.factory.regularAndStaticColumns)
+ {
+ int idx =
state.model.factory.regularAndStaticColumns.indexOf(col);
+ ByteBuffer value = row[idx];
+ if (value == null) continue;
+ if (col.type().isMultiCell())
+ {
+ List<ByteBuffer> timestamps =
LONG_LIST_TYPE.unpack(value);
+ int cellIndex = 0;
+ for (var timestamp : timestamps)
+ {
+
Assertions.assertThat(LongType.instance.compare(timestamp, upperboundTimestamp))
+ .describedAs("Unexected timestamp at
multi-cell index %s for col %s: %s > %s", cellIndex, col,
LongType.instance.compose(timestamp), state.operations)
+ .isLessThanOrEqualTo(state.operations);
+ cellIndex++;
+ }
+ }
+ else
+ {
+ Assertions.assertThat(LongType.instance.compare(value,
upperboundTimestamp))
+ .describedAs("Unexected timestamp for col
%s: %s > %s", col, LongType.instance.compose(value), state.operations)
+ .isLessThanOrEqualTo(state.operations);
+ }
+ }
+ }
+ });
+ }
+
protected static <S extends CommonState> Property.Command<S, Void, ?>
insert(RandomSource rs, S state)
{
int timestamp = ++state.operations;
- Mutation mutation =
state.mutationGen().next(rs).withTimestamp(timestamp);
+ Mutation original = state.mutationGen().next(rs);
+ Mutation mutation = state.allowUsingTimestamp()
+ ? original.withTimestamp(timestamp)
Review Comment:
This is changing the timestamp of the entire mutation and not specific rows
in it which is why the implementation driven by this test satisfies the test,
but doesn't actually work correctly for real world unlogged single partition
batches which can have different rows with different timestamps.
##########
test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedTime.java:
##########
@@ -274,7 +274,7 @@ public long nanoTime()
nextDrift = nanosDriftSupplier.get(random);
from = global;
to = global + Math.max(baseDrift, nextDrift);
- diffPerGlobal = (nextDrift - baseDrift) / (double)(to - from);
+ diffPerGlobal = to == from ? 1 : (nextDrift - baseDrift) /
(double)(to - from);
Review Comment:
I spent 5 minutes looking at this and didn't grok what `diffPerGlobal` is
doing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]