aweisberg commented on code in PR #4220:
URL: https://github.com/apache/cassandra/pull/4220#discussion_r2210827616


##########
src/java/org/apache/cassandra/batchlog/BatchlogManager.java:
##########
@@ -431,7 +431,7 @@ public boolean replay(RateLimiter rateLimiter, Set<UUID> 
hintedNodes) throws IOE
             if (accordMutations != null)
             {
                 accordTxnStart = 
accordTxnStart.withStartedAt(Clock.Global.nanoTime());
-                accordResult = accordMutations != null ? 
mutateWithAccordAsync(cm, accordMutations, null, accordTxnStart) : null;
+                accordResult = accordMutations != null ? 
mutateWithAccordAsync(cm, accordMutations, null, accordTxnStart, true) : null;

Review Comment:
   Can you make preserve timestamps an enum so that we can then document the 
enum values so it will be easy to discover anywhere we use the enum what it's 
all about?



##########
src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java:
##########
@@ -86,23 +88,26 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
     public static final TxnWrite EMPTY_CONDITION_FAILED = new 
TxnWrite(TableMetadatas.none(), Collections.emptyList(), false);
 
     private static final long EMPTY_SIZE = 
ObjectSizes.measure(EMPTY_CONDITION_FAILED);
+    private static final int FLAG_USER_TIMESTAMP = 0x01;
 
     public static class Update extends 
AbstractParameterisedVersionedSerialized<PartitionUpdate, TableMetadatas>
     {
-        private static final long EMPTY_SIZE = ObjectSizes.measure(new 
Update(null, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER));
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new 
Update(null, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, false));
         public final PartitionKey key;
         public final int index;
+        public final boolean userTimestamp;
 
-        public Update(PartitionKey key, int index, PartitionUpdate update, 
TableMetadatas tables)
+        public Update(PartitionKey key, int index, PartitionUpdate update, 
TableMetadatas tables, boolean userTimestamp)

Review Comment:
   I don't think this is the right granularity for `userTimestamp` because it's 
a whole partition update which is multiple rows some of which might preserve 
timestamp and some of which might not.
   
   We already capture what to do with timestamps with `preserveTimestamps` for 
the whole partition update so it's redundant to add `userTimestamp`?



##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -1248,20 +1249,22 @@ public static void mutateWithTriggers(List<? extends 
IMutation> mutations,
         if (augmented != null || mutateAtomically || updatesView)
             mutateAtomically(augmented != null ? augmented : 
(List<Mutation>)mutations, consistencyLevel, updatesView, requestTime);
         else
-            dispatchMutationsWithRetryOnDifferentSystem(mutations, 
consistencyLevel, requestTime);
+            dispatchMutationsWithRetryOnDifferentSystem(mutations, 
consistencyLevel, requestTime, preserveTimestamps);
     }
 
-    public static void dispatchMutationsWithRetryOnDifferentSystem(List<? 
extends IMutation> mutations, ConsistencyLevel consistencyLevel, 
Dispatcher.RequestTime requestTime)
+    public static void dispatchMutationsWithRetryOnDifferentSystem(List<? 
extends IMutation> mutations, ConsistencyLevel consistencyLevel, 
Dispatcher.RequestTime requestTime, boolean preserveTimestamps)
     {
         while (true)
         {
             ClusterMetadata cm = ClusterMetadata.current();
             try
             {
-                SplitMutations splitMutations = 
splitMutationsIntoAccordAndNormal(cm, (List<IMutation>)mutations);
+                SplitMutations<?> splitMutations = 
splitMutationsIntoAccordAndNormal(cm, (List<IMutation>)mutations);
                 List<? extends IMutation> accordMutations = 
splitMutations.accordMutations();
-                IAccordResult<TxnResult> accordResult = accordMutations != 
null ? mutateWithAccordAsync(cm, accordMutations, consistencyLevel, 
requestTime) : null;
                 List<? extends IMutation> normalMutations = 
splitMutations.normalMutations();
+                if (!preserveTimestamps && normalMutations != null)

Review Comment:
   Add a comment explaining that if there was ever any attempt to apply part of 
the mutation using the eventually consistent path then we need to continue to 
use the timestamp used by the eventually consistent path to not end up with 
multiple timestamps, but if it only ever used the transactional path then we 
can use the transactional timestamp to get linearizability



##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -86,6 +87,7 @@ public class Mutation implements IMutation, Supplier<Mutation>
     volatile long viewLockAcquireStart;
 
     private final boolean cdcEnabled;
+    private final boolean userTimestamp;

Review Comment:
   I don't think this flag makes sense or has utility? Only the coordinator 
needs to make the call about whether to preserve the timestamp. It doesn't need 
to be serialized and carried with the mutation.
   
   Batchlog and hints always preserve mutations so they don't need it 
serialized.
   
   The granularity isn't enough to do anything useful at the coordinator and 
`mutateWithTriggers` already has this information passed in as a parameter. I 
really don't think it's worth the noise in `Mutation` if it's never going to be 
usable for anything.



##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -552,17 +570,18 @@ private Serialization serialization(Mutation mutation, 
int version)
         }
 
         static void 
serializeInternal(PartitionUpdate.PartitionUpdateSerializer serializer,
-                                         Mutation mutation,
-                                         DataOutputPlus out,
-                                         int version) throws IOException
+                                      Mutation mutation,
+                                      DataOutputPlus out,
+                                      int version) throws IOException
         {
             Map<TableId, PartitionUpdate> modifications = 
mutation.modifications;
 
             if (version >= VERSION_51)
             {
                 int flags = 0;
                 flags |= 
potentialTxnConflictsFlag(mutation.potentialTxnConflicts);
-                out.write(flags);
+                flags |= userTimestampFlag(mutation.userTimestamp);
+                out.writeUnsignedVInt32(flags);

Review Comment:
   Using a vint is fine, but `write` with a 32-bit int writes 1 byte not 4. 
With versioning of serialization we don't really need to burn instructions on a 
vint unless we want to add flags without bumping the version?



##########
src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java:
##########
@@ -262,17 +266,18 @@ public static IAccordResult<TxnResult> 
mutateWithAccordAsync(ClusterMetadata cm,
         long minEpoch = Epoch.EMPTY.getEpoch();
         for (IMutation mutation : mutations)
         {
+            boolean userTimestamp = mutation instanceof Mutation && 
((Mutation) mutation).isUserTimestamp();

Review Comment:
   Again, this isn't a useful granularity. In practice mutation is not a 
grouping upon which `userTimestamp` is set its per row?
   
   We don't need this intermediate step that only works with some CQL but not 
others and adds a lot of noise and needs to be replaced anyways.
   
   Just the flag to `mutateWithTriggers` is enough to either error out on user 
timestamps, or preserve them. Partially linearizable batches is just wonky and 
not really worth what this amount of code achieve that needs to be replaced 
anyways?



##########
src/java/org/apache/cassandra/cql3/statements/BatchStatement.java:
##########
@@ -357,7 +357,7 @@ public List<? extends IMutation> getMutations(ClientState 
state,
         }
         // local is either executeWithoutConditions modifying a virtual table 
(doesn't support txns) or executeLocal
         // which is called by test or internal things that are bypassing 
distributed system modification/checks
-        return collector.toMutations(state, local ? 
PotentialTxnConflicts.ALLOW : PotentialTxnConflicts.DISALLOW);
+        return collector.toMutations(state, local ? 
PotentialTxnConflicts.ALLOW : PotentialTxnConflicts.DISALLOW, 
attrs.isTimestampSet());

Review Comment:
   I don't think it will ever be that useful? We pass in whether to preserve 
timestamps to `mutateWithTriggers` and I think that is enough. We don't need to 
serialize it with the mutation because it's only needed for batch log and hints 
where it is flipped to true anyways by necessity.
   
   Anything that fully addresses it will need a per statement granularity 
preserving timestamp.
   
   Anything that partially addresses it can use the boolean passed 
`mutateWithTriggers` to signal an error in cases where the timestamp wouldn't 
be preserved.



##########
test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java:
##########
@@ -206,10 +211,55 @@ protected <S extends BaseState> 
Property.StatefulSuccess<S, Void> onSuccess(Logg
         });
     }
 
+    protected static <S extends CommonState> Property.Command<S, Void, ?> 
validateUsingTimestamp(RandomSource rs, S state)
+    {
+        if (state.operations == 0)
+            return ignoreCommand();
+        var builder = Select.builder(state.metadata);
+        for (var c : state.model.factory.regularAndStaticColumns)
+            builder.selection(FunctionCall.writetime(c));
+        ByteBuffer upperboundTimestamp = LongType.instance.decompose((long) 
state.operations);
+        var select = builder.build();
+        var inst = state.selectInstance(rs);
+        return new Property.SimpleCommand<>(state.humanReadable(select, null), 
s -> {
+            var result = s.executeQuery(inst, Integer.MAX_VALUE, s.selectCl(), 
select);
+            for (var row : result)
+            {
+                for (var col : state.model.factory.regularAndStaticColumns)
+                {
+                    int idx = 
state.model.factory.regularAndStaticColumns.indexOf(col);
+                    ByteBuffer value = row[idx];
+                    if (value == null) continue;
+                    if (col.type().isMultiCell())
+                    {
+                        List<ByteBuffer> timestamps = 
LONG_LIST_TYPE.unpack(value);
+                        int cellIndex = 0;
+                        for (var timestamp : timestamps)
+                        {
+                            
Assertions.assertThat(LongType.instance.compare(timestamp, upperboundTimestamp))
+                                      .describedAs("Unexected timestamp at 
multi-cell index %s for col %s: %s > %s", cellIndex, col, 
LongType.instance.compose(timestamp), state.operations)
+                                      .isLessThanOrEqualTo(state.operations);
+                            cellIndex++;
+                        }
+                    }
+                    else
+                    {
+                        Assertions.assertThat(LongType.instance.compare(value, 
upperboundTimestamp))
+                                  .describedAs("Unexected timestamp for col 
%s: %s > %s", col, LongType.instance.compose(value), state.operations)
+                                  .isLessThanOrEqualTo(state.operations);
+                    }
+                }
+            }
+        });
+    }
+
     protected static <S extends CommonState> Property.Command<S, Void, ?> 
insert(RandomSource rs, S state)
     {
         int timestamp = ++state.operations;
-        Mutation mutation = 
state.mutationGen().next(rs).withTimestamp(timestamp);
+        Mutation original = state.mutationGen().next(rs);
+        Mutation mutation = state.allowUsingTimestamp()
+                            ? original.withTimestamp(timestamp)

Review Comment:
   This is changing the timestamp of the entire mutation and not specific rows 
in it which is why the implementation driven by this test satisfies the test, 
but doesn't actually work correctly for real world unlogged single partition 
batches which can have different rows with different timestamps.



##########
test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedTime.java:
##########
@@ -274,7 +274,7 @@ public long nanoTime()
                 nextDrift = nanosDriftSupplier.get(random);
                 from = global;
                 to = global + Math.max(baseDrift, nextDrift);
-                diffPerGlobal = (nextDrift - baseDrift) / (double)(to - from);
+                diffPerGlobal = to == from ? 1 : (nextDrift - baseDrift) / 
(double)(to - from);

Review Comment:
   I spent 5 minutes looking at this and didn't grok what `diffPerGlobal` is 
doing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to