gemmellr commented on code in PR #5172:
URL: https://github.com/apache/activemq-artemis/pull/5172#discussion_r1732732990
##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -233,6 +233,9 @@ private synchronized void store(MapRecord<K, V> record) {
// callers must be synchronized
private void removed(MapRecord<K, V> record) {
+ if (logger.isTraceEnabled()) {
+ logger.info("Removing record {}", record, new Exception("trace"));
Review Comment:
Level mismatch between the gate and the actual logging.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java:
##########
@@ -260,12 +268,12 @@ private void validateExpireSet(long queueID,
JournalHashMap<AckRetry, AckRetry,
if (retry.getQueueAttempts() >=
configuration.getMirrorAckManagerQueueAttempts()) {
if (retry.attemptedPage() >=
configuration.getMirrorAckManagerPageAttempts()) {
if (logger.isDebugEnabled()) {
- logger.debug("Retried {} {} times, giving up on the entry
now", retry, retry.getPageAttempts());
+ logger.debug("Retried {} {} times, giving up on the entry
now. Configuration Page Attempts={}", retry, retry.getPageAttempts(),
configuration.getMirrorAckManagerPageAttempts());
}
retries.remove(retry);
} else {
if (logger.isDebugEnabled()) {
- logger.trace("Retry {} attempted {} times on paging", retry,
retry.getPageAttempts());
+ logger.trace("Retry {} attempted {} times on paging,
Configuration Page Attempts={}", retry, retry.getPageAttempts(),
configuration.getMirrorAckManagerPageAttempts());
Review Comment:
Also a (pre-existing) level mismatch between the gate and the actual log
method.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java:
##########
@@ -35,7 +36,7 @@ public void executeOnCompletion(final IOCallback runnable) {
}
@Override
- public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+ public void executeOnCompletion(IOCallback runnable,
OperationConsistencyLevel operationType) {
Review Comment:
operationType -> consistencyLevel
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -477,8 +480,11 @@ private boolean sendMessage(Message message,
DeliveryAnnotations deliveryAnnotat
message.setAddress(internalAddress);
}
+ // notice that MirrorTransaction is overriding getRequiredConsistency
that is being set to ignore Replication.
+ // that means in case the target server is using replication, we will
not wait for a roundtrip before the message is sent
+ // however we will wait the roundtrip before acking the message
+ // This is to alleviate a situation where messages would take too long
to be delivered and be ready for ack
Review Comment:
MirrorTransaction itself would probably benefit from some Javadoc to make
this more discoverable to anyone using it later.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -165,56 +185,74 @@ public synchronized void replicationDone() {
@Override
public void executeOnCompletion(IOCallback runnable) {
- executeOnCompletion(runnable, false);
+ executeOnCompletion(runnable, OperationConsistencyLevel.FULL);
}
@Override
- public void executeOnCompletion(final IOCallback completion, final boolean
storeOnly) {
+ public void executeOnCompletion(final IOCallback completion, final
OperationConsistencyLevel consistencyLevel) {
boolean executeNow = false;
synchronized (this) {
if (errorCode == -1) {
final long storeLined = STORE_LINEUP_UPDATER.get(this);
final long pageLined = PAGE_LINEUP_UPDATER.get(this);
final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this);
- if (storeOnly) {
- if (storeOnlyTasks == null) {
- storeOnlyTasks = new LinkedList<>();
- }
- } else {
- if (tasks == null) {
- tasks = new LinkedList<>();
- minimalReplicated = replicationLined;
- minimalStore = storeLined;
- minimalPage = pageLined;
- }
- }
- // On this case, we can just execute the context directly
-
- if (replicationLined == replicated && storeLined == stored &&
pageLined == paged) {
- // We want to avoid the executor if everything is complete...
- // However, we can't execute the context if there are
executions pending
- // We need to use the executor on this case
- if (EXECUTORS_PENDING_UPDATER.get(this) == 0) {
- // No need to use an executor here or a context switch
- // there are no actions pending.. hence we can just execute
the task directly on the same thread
- executeNow = true;
- } else {
- execute(completion);
- }
- } else {
- if (storeOnly) {
- if (storeLined == stored &&
EXECUTORS_PENDING_UPDATER.get(this) == 0) {
- executeNow = true;
+ switch (consistencyLevel) {
+ case STORAGE:
+ if (storeOnlyTasks == null) {
+ storeOnlyTasks = new LinkedList<>();
+ }
+ if (storeLined == stored) {
+ if (hasNoPendingExecution()) {
+ executeNow = true;
+ } else {
+ execute(completion);
+ }
} else {
- assert !storeOnlyTasks.isEmpty() ?
storeOnlyTasks.peekLast().storeLined <= storeLined : true;
storeOnlyTasks.add(new StoreOnlyTaskHolder(completion,
storeLined));
}
- } else {
- // ensure total ordering
- assert validateTasksAdd(storeLined, replicationLined,
pageLined);
- tasks.add(new TaskHolder(completion, storeLined,
replicationLined, pageLined));
- }
+ break;
+
+ case IGNORE_REPLICATION:
+ if (ignoreReplicationTasks == null) {
+ ignoreReplicationTasks = new LinkedList<>();
+ }
+
+ if (storeLined == stored && pageLined == paged) {
+ if (hasNoPendingExecution()) {
+ // No need to use an executor here or a context switch
+ // there are no actions pending.. hence we can just
execute the task directly on the same thread
+ executeNow = true;
+ } else {
+ execute(completion);
+ }
+ } else {
+ ignoreReplicationTasks.add(new TaskHolder(completion,
storeLined, replicationLined, pageLined));
Review Comment:
The STORAGE case has its own task holder, to better capture its intent and
to reduce its mem usage. Should this?
##########
artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java:
##########
@@ -179,22 +181,22 @@ public void done() {
@Test
public void testCompletionLateStoreOnly() throws Exception {
- testCompletionLate(true);
+ testCompletionLate(OperationConsistencyLevel.STORAGE);
}
@Test
public void testCompletionLate() throws Exception {
- testCompletionLate(false);
+ testCompletionLate(OperationConsistencyLevel.FULL);
}
- private void testCompletionLate(boolean storeOnly) throws Exception {
+ private void testCompletionLate(OperationConsistencyLevel storeType) throws
Exception {
Review Comment:
storeType -> consistencyLevel
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java:
##########
@@ -2079,8 +2085,10 @@ public void executeOnCompletion(final IOCallback
runnable) {
}
@Override
- public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
- executeOnCompletion(runnable);
+ public void executeOnCompletion(IOCallback runnable,
OperationConsistencyLevel storeType) {
Review Comment:
storeType -> consistencyLevel
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java:
##########
@@ -154,7 +155,7 @@ public void done() {
}
@Override
- public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+ public void executeOnCompletion(IOCallback runnable,
OperationConsistencyLevel operationType) {
Review Comment:
operationType -> consistencyLevel
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -262,46 +291,76 @@ private void checkStoreTasks() {
final long stored = this.stored;
for (int i = 0; i < size; i++) {
final StoreOnlyTaskHolder holder = storeOnlyTasks.peek();
- if (stored < holder.storeLined) {
- // fail fast: storeOnlyTasks are ordered by storeLined, there is
no need to continue
- return;
+ if (holder != null) {
Review Comment:
Was this actually needed? If the value is null, it could mean the list was
empty which shouldnt happen given the size check above, or that we added a null
which should be guarded against as all the adds are of new holders.
Currently it would also still poll afterwards (below), which feels a bit
weird as it seems like something is seriously wrong if this was ever null and
throwing would be appropriate.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -165,56 +185,74 @@ public synchronized void replicationDone() {
@Override
public void executeOnCompletion(IOCallback runnable) {
- executeOnCompletion(runnable, false);
+ executeOnCompletion(runnable, OperationConsistencyLevel.FULL);
}
@Override
- public void executeOnCompletion(final IOCallback completion, final boolean
storeOnly) {
+ public void executeOnCompletion(final IOCallback completion, final
OperationConsistencyLevel consistencyLevel) {
boolean executeNow = false;
synchronized (this) {
if (errorCode == -1) {
final long storeLined = STORE_LINEUP_UPDATER.get(this);
final long pageLined = PAGE_LINEUP_UPDATER.get(this);
final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this);
- if (storeOnly) {
- if (storeOnlyTasks == null) {
- storeOnlyTasks = new LinkedList<>();
- }
- } else {
- if (tasks == null) {
- tasks = new LinkedList<>();
- minimalReplicated = replicationLined;
- minimalStore = storeLined;
- minimalPage = pageLined;
- }
- }
- // On this case, we can just execute the context directly
-
- if (replicationLined == replicated && storeLined == stored &&
pageLined == paged) {
- // We want to avoid the executor if everything is complete...
- // However, we can't execute the context if there are
executions pending
- // We need to use the executor on this case
- if (EXECUTORS_PENDING_UPDATER.get(this) == 0) {
- // No need to use an executor here or a context switch
- // there are no actions pending.. hence we can just execute
the task directly on the same thread
- executeNow = true;
- } else {
- execute(completion);
- }
- } else {
- if (storeOnly) {
- if (storeLined == stored &&
EXECUTORS_PENDING_UPDATER.get(this) == 0) {
- executeNow = true;
+ switch (consistencyLevel) {
+ case STORAGE:
+ if (storeOnlyTasks == null) {
+ storeOnlyTasks = new LinkedList<>();
+ }
+ if (storeLined == stored) {
+ if (hasNoPendingExecution()) {
+ executeNow = true;
+ } else {
+ execute(completion);
+ }
} else {
- assert !storeOnlyTasks.isEmpty() ?
storeOnlyTasks.peekLast().storeLined <= storeLined : true;
storeOnlyTasks.add(new StoreOnlyTaskHolder(completion,
storeLined));
}
- } else {
- // ensure total ordering
- assert validateTasksAdd(storeLined, replicationLined,
pageLined);
- tasks.add(new TaskHolder(completion, storeLined,
replicationLined, pageLined));
- }
+ break;
+
+ case IGNORE_REPLICATION:
+ if (ignoreReplicationTasks == null) {
+ ignoreReplicationTasks = new LinkedList<>();
+ }
+
+ if (storeLined == stored && pageLined == paged) {
+ if (hasNoPendingExecution()) {
+ // No need to use an executor here or a context switch
+ // there are no actions pending.. hence we can just
execute the task directly on the same thread
Review Comment:
If the comment is needed, feels like it ought to be on at least the first
example in the method, rather than the second and third.
##########
tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java:
##########
@@ -6636,7 +6637,7 @@ public void executeOnCompletion(IOCallback runnable) {
}
@Override
- public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+ public void executeOnCompletion(IOCallback runnable,
OperationConsistencyLevel storeOnly) {
Review Comment:
storeOnly -> consistencyLevel
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact