ibessonov commented on code in PR #4821:
URL: https://github.com/apache/ignite-3/pull/4821#discussion_r1879985979
##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Command.java:
##########
@@ -17,10 +17,27 @@
package org.apache.ignite.internal.raft;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.NetworkMessage;
+import org.jetbrains.annotations.Nullable;
/**
* A marker interface for replication group command.
*/
public interface Command extends NetworkMessage {
+ /**
+ * This is called before a command is submitted to replication pipeline.
+ *
+ * @param safeTs Safe timestamp.
+ */
+ default void patch(HybridTimestamp safeTs) {}
Review Comment:
Shouldn't we only do this for write commands?
##########
modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java:
##########
@@ -313,6 +313,38 @@ public void writeInt(int val) {
writeVarInt(val + 1);
}
+ /** {@inheritDoc} */
+ @Override
+ public void writeFixedInt(int val) {
+ lastFinished = remainingInternal() >= Integer.BYTES;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ GridUnsafe.putInt(heapArr, baseOff + pos, val);
Review Comment:
This call is system-dependent, while in our case I suppose that we want to
always use Little Endian. Similar problem is solved in
`org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput#putInt`, for
example, please change your implementation accordingly.
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java:
##########
@@ -31,39 +32,51 @@ public
PartitionCommandsMarshallerImpl(MessageSerializationRegistry serializatio
super(serializationRegistry, cache);
}
+ @Override
+ public void patch(ByteBuffer raw, HybridTimestamp safeTs) {
+ ByteBuffer dup = raw.duplicate().order(ORDER);
+ dup.putLong(4, safeTs.longValue());
Review Comment:
Please introduce a constant for `4`
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -241,30 +215,46 @@ private void
onWriteBusy(Iterator<CommandClosure<WriteCommand>> iterator) {
storage.acquirePartitionSnapshotsReadLock();
try {
+ boolean[] applied = {false};
+
if (command instanceof UpdateCommand) {
- result = handleUpdateCommand((UpdateCommand) command,
commandIndex, commandTerm);
+ result = handleUpdateCommand((UpdateCommand) command,
commandIndex, commandTerm, applied);
} else if (command instanceof UpdateAllCommand) {
- result = handleUpdateAllCommand((UpdateAllCommand)
command, commandIndex, commandTerm);
+ result = handleUpdateAllCommand((UpdateAllCommand)
command, commandIndex, commandTerm, applied);
} else if (command instanceof FinishTxCommand) {
- result = handleFinishTxCommand((FinishTxCommand) command,
commandIndex, commandTerm);
+ result = handleFinishTxCommand((FinishTxCommand) command,
commandIndex, commandTerm, applied);
} else if (command instanceof WriteIntentSwitchCommand) {
- handleWriteIntentSwitchCommand((WriteIntentSwitchCommand)
command, commandIndex, commandTerm);
+ handleWriteIntentSwitchCommand((WriteIntentSwitchCommand)
command, commandIndex, commandTerm, applied);
} else if (command instanceof SafeTimeSyncCommand) {
- handleSafeTimeSyncCommand((SafeTimeSyncCommand) command,
commandIndex, commandTerm);
+ handleSafeTimeSyncCommand((SafeTimeSyncCommand) command,
commandIndex, commandTerm, applied);
} else if (command instanceof BuildIndexCommand) {
- handleBuildIndexCommand((BuildIndexCommand) command,
commandIndex, commandTerm);
+ handleBuildIndexCommand((BuildIndexCommand) command,
commandIndex, commandTerm, applied);
} else if (command instanceof PrimaryReplicaChangeCommand) {
-
handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand) command,
commandIndex, commandTerm);
+
handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand) command,
commandIndex, commandTerm, applied);
} else if (command instanceof VacuumTxStatesCommand) {
- handleVacuumTxStatesCommand((VacuumTxStatesCommand)
command, commandIndex, commandTerm);
+ handleVacuumTxStatesCommand((VacuumTxStatesCommand)
command, commandIndex, commandTerm, applied);
} else if (command instanceof
UpdateMinimumActiveTxBeginTimeCommand) {
-
handleUpdateMinimalActiveTxTimeCommand((UpdateMinimumActiveTxBeginTimeCommand)
command, commandIndex, commandTerm);
+
handleUpdateMinimalActiveTxTimeCommand((UpdateMinimumActiveTxBeginTimeCommand)
command, commandIndex, commandTerm,
+ applied);
} else {
assert false : "Command was not found [cmd=" + command +
']';
}
+
+ if (applied[0]) {
+ // Adjust safe time before completing update to reduce
waiting.
+ if (command instanceof SafeTimePropagatingCommand) {
+ SafeTimePropagatingCommand safeTimePropagatingCommand
= (SafeTimePropagatingCommand) command;
+
+ assert safeTimePropagatingCommand.safeTime() != null;
+
+ updateTrackerIgnoringTrackerClosedException(safeTime,
safeTimePropagatingCommand.safeTime());
+ }
+
+
updateTrackerIgnoringTrackerClosedException(storageIndexTracker, commandIndex);
Review Comment:
This was outside of the `PartitionSnapshotsReadLock`, maybe for a reason.
Please confirm that this change is safe.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1837,23 +1820,19 @@ private CompletableFuture<Object> applyFinishCommand(
int catalogVersion,
List<TablePartitionIdMessage> partitionIds
) {
- synchronized (commandProcessingLinearizationMutex) {
- FinishTxCommandBuilder finishTxCmdBldr =
PARTITION_REPLICATION_MESSAGES_FACTORY.finishTxCommand()
- .txId(transactionId)
- .commit(commit)
- .safeTime(clockService.now())
- .requiredCatalogVersion(catalogVersion)
- .partitionIds(partitionIds);
-
- if (commit) {
- finishTxCmdBldr.commitTimestamp(commitTimestamp);
- }
- CompletableFuture<Object> resultFuture = new CompletableFuture<>();
-
-
applyCmdWithRetryOnSafeTimeReorderException(finishTxCmdBldr.build(),
resultFuture);
+ HybridTimestamp now = clockService.now();
+ FinishTxCommandBuilder finishTxCmdBldr =
PARTITION_REPLICATION_MESSAGES_FACTORY.finishTxCommand()
+ .txId(transactionId)
+ .commit(commit)
+ .initiatorTime(now)
+ .requiredCatalogVersion(catalogVersion)
+ .partitionIds(partitionIds);
- return resultFuture;
+ if (commit) {
+ finishTxCmdBldr.commitTimestamp(commitTimestamp);
Review Comment:
Where can I read about our guarantees about `commitTimestamp` generation?
How is it related to safe time? I don't know much about it, and I have a
feeling that there could be issues with it
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java:
##########
@@ -59,7 +60,8 @@ public CheckCatalogVersionOnAppendEntries(CatalogService
catalogService) {
Node node = (Node) service;
- ByteBuffer allData = request.data().asReadOnlyBuffer();
+ // TODO use from marshaller
Review Comment:
Please mention the corresponding `Jira` link
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshaller.java:
##########
@@ -37,4 +37,6 @@ public interface PartitionCommandsMarshaller extends
Marshaller {
* @return Catalog version. {@value #NO_VERSION_REQUIRED} if version is
not required for the given command.
*/
int readRequiredCatalogVersion(ByteBuffer raw);
+
+ long readSafeTimestamp(ByteBuffer raw);
Review Comment:
Please provide javadocs to new methods that you introduce to interfaces
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java:
##########
@@ -31,39 +32,51 @@ public
PartitionCommandsMarshallerImpl(MessageSerializationRegistry serializatio
super(serializationRegistry, cache);
}
+ @Override
+ public void patch(ByteBuffer raw, HybridTimestamp safeTs) {
+ ByteBuffer dup = raw.duplicate().order(ORDER);
Review Comment:
I think we should add an optimistic check in case `raw.order()` matches
`ORDER`, this way we will avoid unnecessary allocations. But this is up to you.
##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java:
##########
@@ -70,20 +85,90 @@ protected int replicas() {
@Override
protected HybridClock createClock(ClusterNode node) {
- // Client physical time is frozen in the past, server time advances
normally.
- return new TestHybridClock(() -> node.address().port() == CLIENT_PORT
? CLIENT_FROZEN_PHYSICAL_TIME : System.currentTimeMillis());
+ int idx = NODE_PORT_BASE - node.address().port() + 1;
+
+ // Physical time is frozen.
+ return new TestHybridClock(
+ () -> node.address().port() == CLIENT_PORT ?
CLIENT_FROZEN_PHYSICAL_TIME : CLIENT_FROZEN_PHYSICAL_TIME + 1000L * idx);
+ }
+
+ @Override
+ protected long getSafeTimePropagationTimeout() {
+ return 300_000;
}
@Test
public void testImplicitObservableTimePropagation() {
RecordView<Tuple> view = accounts.recordView();
view.upsert(null, makeValue(1, 100.0));
- TxManagerImpl clientTxManager = (TxManagerImpl)
txTestCluster.clientTxManager;
- Collection<TxStateMeta> states = clientTxManager.states();
+ List<TxStateMeta> states = txTestCluster.states();
assertEquals(1, states.size());
- HybridTimestamp commitTs = states.iterator().next().commitTimestamp();
+ HybridTimestamp commitTs = states.get(0).commitTimestamp();
+
+ LOG.info("commitTs={}", commitTs);
+
assertNotNull(commitTs);
assertEquals(commitTs, timestampTracker.get());
- assertTrue(commitTs.getPhysical() != CLIENT_FROZEN_PHYSICAL_TIME,
"Client time should be advanced to server time");
+
+ assertTrue(commitTs.compareTo(new
HybridTimestamp(CLIENT_FROZEN_PHYSICAL_TIME, 0)) > 0, "Observable timestamp
should be advanced");
+
+ TablePartitionId part = new TablePartitionId(accounts.tableId(), 0);
+
+ NodeImpl[] handle = {null};
+ NodeImpl[] leader = {null};
+
+
txTestCluster.raftServers().values().stream().map(Loza::server).forEach(s -> {
+ JraftServerImpl srv = (JraftServerImpl) s;
+ List<RaftGroupService> grps =
srv.localNodes().stream().map(srv::raftGroupService).collect(toList());
Review Comment:
You don't have to call `collect(toList())` if all you need is `forEach`,
please remove the `collect` call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]