rpuch commented on code in PR #4821:
URL: https://github.com/apache/ignite-3/pull/4821#discussion_r1883812229
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -836,7 +836,7 @@ private RaftGroupOptions groupOptionsForPartition(boolean
isVolatileStorage, @Nu
}
raftGroupOptions.snapshotStorageFactory(snapshotFactory);
-
+ raftGroupOptions.maxClockSkew((int) clockService.maxClockSkewMillis());
Review Comment:
Why is `maxClockSkew` type `int`? We usually represent 'times in millis' as
longs, this allows to avoid casting
##########
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java:
##########
@@ -110,6 +110,35 @@ public void update(T newValue, @Nullable R futureResult) {
}
}
+ /**
+ * Strict update with reordering check. Always called from the same
updater thread.
+ *
+ * @param newValue New value.
+ * @param futureResult A result that will be used to complete a future
returned by the
+ * {@link PendingComparableValuesTracker#waitFor(Comparable)}.
+ */
+ public void updateStrict(T newValue, @Nullable R futureResult) {
+ if (!busyLock.readLock().tryLock()) {
+ throw new TrackerClosedException();
+ }
+
+ try {
+ Map.Entry<T, @Nullable R> current = this.current;
+
+ IgniteBiTuple<T, @Nullable R> newEntry = new
IgniteBiTuple<>(newValue, futureResult);
+
+ // Entries from the same batch receive equal safe timestamps.
Review Comment:
This class is generic, it's about some comparable values and not just about
timestamps. So the comment about 'safe timestamps' looks strange here. Also,
what batches are mentioned here? Looks like some details leaked here from the
context where this method is used.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java:
##########
@@ -58,6 +58,11 @@ public class RaftGroupOptions {
*/
private @Nullable Long externallyEnforcedConfigIndex;
+ /**
+ * Max clock skew in the replication group in milliseconds.
+ */
+ private int maxClockSkew;
Review Comment:
```suggestion
private int maxClockSkewMs;
```
This will allow to make usages obvious wrt the unit
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -300,10 +279,10 @@ private void
onWriteBusy(Iterator<CommandClosure<WriteCommand>> iterator) {
* @param commandIndex Index of the RAFT command.
* @param commandTerm Term of the RAFT command.
*/
- private UpdateCommandResult handleUpdateCommand(UpdateCommand cmd, long
commandIndex, long commandTerm) {
+ private UpdateCommandResult handleUpdateCommand(UpdateCommand cmd, long
commandIndex, long commandTerm, boolean[] applied) {
// Skips the write command because the storage has already executed it.
if (commandIndex <= storage.lastAppliedIndex()) {
- return new UpdateCommandResult(true, isPrimaryInGroupTopology());
+ return null;
Review Comment:
Is it ok to return `null` here? The method is not declared as returning
something nullable. If it's ok to return `null`, please annotate the method
and/or add a comment explaining why it's ok
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -1131,7 +1131,6 @@ private void
sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> replicaFu
ReplicaSafeTimeSyncRequest req =
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
.groupId(toReplicationGroupIdMessage(replica.groupId()))
- .proposedSafeTime(proposedSafeTime)
Review Comment:
Let's remove `proposedSafeTime` from parameters of this method and all
methods that pass it here
##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java:
##########
@@ -17,34 +17,49 @@
package org.apache.ignite.distributed;
+import static java.util.stream.Collectors.toList;
import static org.apache.ignite.distributed.ItTxTestCluster.NODE_PORT_BASE;
import static
org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
-import java.util.Collection;
+import java.util.List;
import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.TxInfrastructureTest;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tx.TxStateMeta;
-import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
/**
- * Tests if commit timestamp is propagated to observable time correctly.
+ * Tests if commit timestamp and safe timestamp are monotonically grow on
leader change.
Review Comment:
```suggestion
* Tests if commit timestamp and safe timestamp monotonically grow on leader
change.
```
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java:
##########
@@ -285,8 +289,10 @@ private class LogEntryAndClosureHandler implements
EventHandler<LogEntryAndClosu
// task list for batch
private final List<LogEntryAndClosure> tasks = new
ArrayList<>(NodeImpl.this.raftOptions.getApplyBatch());
+ private @Nullable HybridTimestamp safeTs = null;
Review Comment:
Safe time doesn't seem to belong to the Raft protocol. Should we track it
elsewhere, not in JRaft core code?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1956,15 +1935,13 @@ private
CompletableFuture<WriteIntentSwitchReplicatedInfo> applyWriteIntentSwitc
indexIdsAtRwTxBeginTs(transactionId)
);
- CompletableFuture<Object> resultFuture = new CompletableFuture<>();
-
- applyCmdWithRetryOnSafeTimeReorderException(wiSwitchCmd, resultFuture);
-
- return resultFuture
+ return applyCmdWithExceptionHandling(wiSwitchCmd)
.exceptionally(e -> {
LOG.warn("Failed to complete transaction cleanup command
[txId=" + transactionId + ']', e);
- return nullCompletedFuture();
+ ExceptionUtils.sneakyThrow(e);
Review Comment:
If the exception is rethrown anyway, we don't need `exceptionally()`. But
why is this changed to throwing the exception?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java:
##########
@@ -31,39 +32,51 @@ public
PartitionCommandsMarshallerImpl(MessageSerializationRegistry serializatio
super(serializationRegistry, cache);
}
+ @Override
+ public void patch(ByteBuffer raw, HybridTimestamp safeTs) {
+ ByteBuffer dup = raw.duplicate().order(ORDER);
+ dup.putLong(4, safeTs.longValue());
+ }
+
@Override
protected void beforeWriteMessage(Object o, ByteBuffer buffer) {
int requiredCatalogVersion = o instanceof CatalogVersionAware
? ((CatalogVersionAware) o).requiredCatalogVersion()
- : NO_VERSION_REQUIRED;
+ : PartitionCommandsMarshaller.NO_VERSION_REQUIRED;
stream.setBuffer(buffer);
- stream.writeInt(requiredCatalogVersion);
+ stream.writeFixedInt(requiredCatalogVersion);
Review Comment:
Please add a comment on why fixed ints are used (probably, to make it
patchable?)
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java:
##########
@@ -121,6 +119,8 @@ public interface TimeoutNowRequest extends Message {
String peerId();
long term();
+
+ @Nullable HybridTimestamp timestamp();
Review Comment:
Why is it nullable? `Replicator` seems to always send a timeout
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -627,19 +592,29 @@ public MvPartitionStorage getMvStorage() {
return storage.getStorage();
}
+ /**
+ * Returns safe timestamp.
+ */
+ @TestOnly
+ public PendingComparableValuesTracker<HybridTimestamp, Void> getSafeTime()
{
Review Comment:
We usually don't use `get` prefixes. How about `safeTime()` or
`safeTimeTracker()` (to distinguish it from actual safe time timestamps)?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java:
##########
@@ -31,28 +32,40 @@ public
PartitionCommandsMarshallerImpl(MessageSerializationRegistry serializatio
super(serializationRegistry, cache);
}
+ @Override
+ public void patch(ByteBuffer raw, HybridTimestamp safeTs) {
+ ByteBuffer dup = raw.duplicate().order(ORDER);
+ dup.putLong(4, safeTs.longValue());
+ }
+
@Override
protected void beforeWriteMessage(Object o, ByteBuffer buffer) {
int requiredCatalogVersion = o instanceof CatalogVersionAware
? ((CatalogVersionAware) o).requiredCatalogVersion()
: NO_VERSION_REQUIRED;
stream.setBuffer(buffer);
- stream.writeInt(requiredCatalogVersion);
+ stream.writeFixedInt(requiredCatalogVersion);
+ stream.writeFixedLong(0);
Review Comment:
Please add a comment saying that it allocates space for safe time
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java:
##########
@@ -256,14 +266,19 @@ private <T extends Command> T copyCommand(T cmd) {
.commit(finishTxCommand.commit())
.partitionIds(finishTxCommand.partitionIds())
.commitTimestamp(finishTxCommand.commitTimestamp())
+ .initiatorTime(finishTxCommand.initiatorTime())
+ .safeTime(finishTxCommand.safeTime())
.build();
} else if (cmd instanceof WriteIntentSwitchCommand) {
WriteIntentSwitchCommand writeIntentSwitchCommand =
(WriteIntentSwitchCommand) cmd;
return (T)
PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommand()
.txId(writeIntentSwitchCommand.txId())
+ .initiatorTime(clock.now())
.commit(writeIntentSwitchCommand.commit())
.commitTimestamp(writeIntentSwitchCommand.commitTimestamp())
+ .initiatorTime(writeIntentSwitchCommand.initiatorTime())
Review Comment:
`initiatorTime` is set twice
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java:
##########
@@ -94,6 +99,10 @@ public boolean isPrimaryInPeersAndLearners() {
return primaryInPeersAndLearners;
}
+ public long safeTimestamp() {
Review Comment:
Please add a javadoc explaining what this is
##########
modules/metastorage/build.gradle:
##########
@@ -38,6 +39,7 @@ dependencies {
implementation project(':ignite-failure-handler')
implementation project(':ignite-metrics')
implementation project(':ignite-system-disaster-recovery-api')
+ implementation project(':ignite-catalog')
Review Comment:
And this one?
##########
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java:
##########
@@ -110,6 +110,35 @@ public void update(T newValue, @Nullable R futureResult) {
}
}
+ /**
+ * Strict update with reordering check. Always called from the same
updater thread.
Review Comment:
Please explain in the javadoc what 'strict' means
##########
modules/metastorage/build.gradle:
##########
@@ -28,6 +28,7 @@ dependencies {
implementation project(':ignite-configuration-system')
implementation project(':ignite-cluster-management')
implementation project(':ignite-network-api')
+ implementation project(':ignite-network')
Review Comment:
Why was this added?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java:
##########
@@ -889,7 +905,7 @@ public boolean bootstrap(final BootstrapOptions opts)
throws InterruptedExceptio
final long bootstrapLogTerm = opts.getLastLogIndex() > 0 ? 1 : 0;
final LogId bootstrapId = new LogId(opts.getLastLogIndex(),
bootstrapLogTerm);
this.options = opts.getNodeOptions() == null ? new NodeOptions() :
opts.getNodeOptions();
- this.clock = options.getClock();
+ this.clock = this.options.getClock() == null ? new HybridClockImpl() :
this.options.getClock();
Review Comment:
Why do we need a possibility to have different clocks? If there is no clock
in options, should we just skip all safetime-related logic?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -447,6 +447,10 @@ public boolean startRaftNode(
// Thread pools are shared by all raft groups.
NodeOptions nodeOptions = opts.copy();
+ // Then a new election starts on a node, it has local physical
time higher than last generated safe ts
Review Comment:
```suggestion
// When a new election starts on a node, it has local physical
time higher than last generated safe ts
```
##########
modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java:
##########
@@ -44,7 +51,16 @@ public interface HybridClock {
HybridTimestamp now();
/**
- * Gets a current timestamp. It is a fast way to get timestamp because it
doesn't have to tick the logical part of the clock.
+ * Creates a timestamp for new event. A timestamp is guarantied to be
unique and monotonically grown and follow the causal.
+ *
+ * @param causal The causal timestamp.
+ *
+ * @return The hybrid timestamp.
+ */
+ HybridTimestamp now(HybridTimestamp causal);
Review Comment:
Do I understand correctly that this new method is logically equivalent to
`update()` followed by `now()`? If so, is it worth adding at all? It is only
used in one place, and the corresponding `nowLong()` is not used externally at
all.
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java:
##########
@@ -119,6 +120,8 @@ public void testRemoveCommand() throws Exception {
.rowUuid(UUID.randomUUID())
.txId(TestTransactionIds.newTransactionId())
.txCoordinatorId(UUID.randomUUID())
+ .txCoordinatorId(UUID.randomUUID())
Review Comment:
A duplicated line
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -241,30 +215,46 @@ private void
onWriteBusy(Iterator<CommandClosure<WriteCommand>> iterator) {
storage.acquirePartitionSnapshotsReadLock();
try {
+ boolean[] applied = {false};
+
if (command instanceof UpdateCommand) {
- result = handleUpdateCommand((UpdateCommand) command,
commandIndex, commandTerm);
+ result = handleUpdateCommand((UpdateCommand) command,
commandIndex, commandTerm, applied);
} else if (command instanceof UpdateAllCommand) {
- result = handleUpdateAllCommand((UpdateAllCommand)
command, commandIndex, commandTerm);
+ result = handleUpdateAllCommand((UpdateAllCommand)
command, commandIndex, commandTerm, applied);
} else if (command instanceof FinishTxCommand) {
- result = handleFinishTxCommand((FinishTxCommand) command,
commandIndex, commandTerm);
+ result = handleFinishTxCommand((FinishTxCommand) command,
commandIndex, commandTerm, applied);
} else if (command instanceof WriteIntentSwitchCommand) {
- handleWriteIntentSwitchCommand((WriteIntentSwitchCommand)
command, commandIndex, commandTerm);
+ handleWriteIntentSwitchCommand((WriteIntentSwitchCommand)
command, commandIndex, commandTerm, applied);
} else if (command instanceof SafeTimeSyncCommand) {
- handleSafeTimeSyncCommand((SafeTimeSyncCommand) command,
commandIndex, commandTerm);
+ handleSafeTimeSyncCommand((SafeTimeSyncCommand) command,
commandIndex, commandTerm, applied);
} else if (command instanceof BuildIndexCommand) {
- handleBuildIndexCommand((BuildIndexCommand) command,
commandIndex, commandTerm);
+ handleBuildIndexCommand((BuildIndexCommand) command,
commandIndex, commandTerm, applied);
} else if (command instanceof PrimaryReplicaChangeCommand) {
-
handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand) command,
commandIndex, commandTerm);
+
handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand) command,
commandIndex, commandTerm, applied);
} else if (command instanceof VacuumTxStatesCommand) {
- handleVacuumTxStatesCommand((VacuumTxStatesCommand)
command, commandIndex, commandTerm);
+ handleVacuumTxStatesCommand((VacuumTxStatesCommand)
command, commandIndex, commandTerm, applied);
} else if (command instanceof
UpdateMinimumActiveTxBeginTimeCommand) {
-
handleUpdateMinimalActiveTxTimeCommand((UpdateMinimumActiveTxBeginTimeCommand)
command, commandIndex, commandTerm);
+
handleUpdateMinimalActiveTxTimeCommand((UpdateMinimumActiveTxBeginTimeCommand)
command, commandIndex, commandTerm,
+ applied);
} else {
assert false : "Command was not found [cmd=" + command +
']';
}
+
+ if (applied[0]) {
Review Comment:
This makes the code uglier than it was, just to work-around the necessity to
avoid triggering the 'strict' check for monotonicity of safe time and index. I
don't know how to improve this for now, but maybe we should think about it
##########
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java:
##########
@@ -110,6 +110,35 @@ public void update(T newValue, @Nullable R futureResult) {
}
}
+ /**
+ * Strict update with reordering check. Always called from the same
updater thread.
+ *
+ * @param newValue New value.
+ * @param futureResult A result that will be used to complete a future
returned by the
+ * {@link PendingComparableValuesTracker#waitFor(Comparable)}.
+ */
+ public void updateStrict(T newValue, @Nullable R futureResult) {
Review Comment:
It seems that this method cannot be used concurrently with other updating
methods as there is no synchonization on `current` update. This looks dangerous
as someone could try to use both methods. If this method is used in the context
where the object is only updated via `updateStrict` and never via conventional
`update`, maybe it would be better to just create another tracker class having
just `updateStrict` (but not `update`) method? Or a
`StrictPendingComparableValuesTracker` with just `update` working in a strict
way?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java:
##########
@@ -296,6 +302,23 @@ public void onEvent(final LogEntryAndClosure event, final
long sequence, final b
return;
}
+ // Patch the command.
+ if (event.done instanceof CommandClosure) {
+ CommandClosure<?> cmd = (CommandClosure<?>) event.done;
+ Command command = cmd.command();
+
+ // Tick once per batch.
+ if (safeTs == null) {
Review Comment:
It seems that `safeTs` field is only used in `LogEntryAndClosureHandler`,
but it's declared in `NodeImpl`. How about moving it to
`LogEntryAndClosureHandler`?
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java:
##########
@@ -151,6 +154,8 @@ public void testUpdateAllCommand() throws Exception {
.messageRowsToUpdate(rowsToUpdate)
.txId(UUID.randomUUID())
.txCoordinatorId(UUID.randomUUID())
+ .txCoordinatorId(UUID.randomUUID())
Review Comment:
Another duplicated line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]