sanpwc commented on code in PR #4143:
URL: https://github.com/apache/ignite-3/pull/4143#discussion_r1701568428
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -368,56 +360,51 @@ void onSnapshotLoad() {
result =
MSG_FACTORY.statementResult().result(ByteBuffer.wrap(entry.value())).build();
}
- idempotentCommandCache.put(commandId, new
IdempotentCommandCachedResult(result, now));
+ idempotentCommandCache.put(commandId, result);
}
}
}
}
/**
* Removes obsolete entries from both volatile and persistent idempotent
command cache.
+ *
+ * @param evictionTimestamp Cached entries older than given timestamp will
be evicted.
+ * @param operationTimestamp Command operation timestamp.
*/
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Call on meta
storage compaction.
- void evictIdempotentCommandsCache() {
- HybridTimestamp cleanupTimestamp = clusterTime.now();
- LOG.info("Idempotent command cache cleanup started
[cleanupTimestamp={}].", cleanupTimestamp);
-
- maxClockSkewMillisFuture.thenAccept(maxClockSkewMillis -> {
- List<CommandId> commandIdsToRemove =
idempotentCommandCache.entrySet().stream()
- .filter(entry ->
entry.getValue().commandStartTime.getPhysical()
- <= cleanupTimestamp.getPhysical() -
(idempotentCacheTtl.value() + maxClockSkewMillis.getAsLong()))
- .map(Map.Entry::getKey)
+ void evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp,
HybridTimestamp operationTimestamp) {
+ LOG.info("Idempotent command cache cleanup started
[evictionTimestamp={}].", evictionTimestamp);
+
+ long obsoleteRevision = storage.revisionByTimestamp(evictionTimestamp);
+
+ if (obsoleteRevision != -1) {
+ byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
+ byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
+
+ List<byte[]> evictionCandidateKeys = storage.range(keyFrom, keyTo,
obsoleteRevision).stream()
+ // Not sure whether it's possible to retrieve empty entry
here, thus !entry.empty() was added just in case.
+ .filter(entry -> !entry.tombstone() && !entry.empty())
+ .map(Entry::key)
.collect(toList());
- if (!commandIdsToRemove.isEmpty()) {
- List<byte[]> commandIdStorageKeys = commandIdsToRemove.stream()
- .map(commandId -> ArrayUtils.concat(new byte[]{},
ByteUtils.toBytes(commandId)))
- .collect(toList());
+ // TODO https://issues.apache.org/jira/browse/IGNITE-22828
+ evictionCandidateKeys.forEach(evictionCandidateKeyBytes -> {
+ byte[] commandIdBytes = copyOfRange(evictionCandidateKeyBytes,
IDEMPOTENT_COMMAND_PREFIX_BYTES.length,
Review Comment:
You mean adding
```
/**
* Deserializes an object from byte array using native java
serialization mechanism.
*
* @param bytes Byte array.
* @param from – the offset in the buffer of the first byte to read.
* @param length – the maximum number of bytes to read from the buffer.
* @return Object.
*/
public static <T> T fromBytes(byte[] bytes, int from, int length) {
try (
var bis = new ByteArrayInputStream(bytes, from, length);
var in = new ObjectInputStream(bis)
) {
return (T) in.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new IgniteInternalException("Could not deserialize an
object", e);
}
}
```
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]