[ 
https://issues.apache.org/jira/browse/ARTEMIS-5694?focusedWorklogId=986905&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-986905
 ]

ASF GitHub Bot logged work on ARTEMIS-5694:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Oct/25 09:00
            Start Date: 13/Oct/25 09:00
    Worklog Time Spent: 10m 
      Work Description: brusdev commented on code in PR #5964:
URL: https://github.com/apache/activemq-artemis/pull/5964#discussion_r2425628509


##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/journal/TimedBufferMovementTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.soak.journal;
+
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
+import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import 
org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TimedBufferMovementTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   final ConcurrentHashMap<String, String> pendingCallbacks = new 
ConcurrentHashMap<>();
+
+   @Test
+   @Timeout(value = 2, unit = TimeUnit.MINUTES, threadMode = 
Timeout.ThreadMode.SEPARATE_THREAD)
+   public void testForceMoveNextFile() throws Exception {
+      int REGULAR_THREADS = 5;
+      int TX_THREADS = 5;
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(REGULAR_THREADS + TX_THREADS + 10);
+      runAfter(executorService::shutdownNow);
+      OrderedExecutorFactory orderedExecutorFactory = new 
OrderedExecutorFactory(executorService);
+
+      NIOSequentialFileFactory factory = new 
NIOSequentialFileFactory(getTestDirfile(), true, 1);
+      factory.start();
+      runAfter(factory::stop);
+
+      JournalImpl journal = new JournalImpl(orderedExecutorFactory, 1024 * 
1024, 10, 10, 3, 0, 50_000, factory, "coll", "data", 1, 0);
+      journal.start();
+      journal.load(new LoaderCallback() {
+         @Override
+         public void addPreparedTransaction(PreparedTransactionInfo 
preparedTransaction) {
+
+         }
+
+         @Override
+         public void addRecord(RecordInfo info) {
+
+         }
+
+         @Override
+         public void deleteRecord(long id) {
+
+         }
+
+         @Override
+         public void updateRecord(RecordInfo info) {
+
+         }
+
+         @Override
+         public void failedTransaction(long transactionID, List<RecordInfo> 
records, List<RecordInfo> recordsToDelete) {
+
+         }
+      });
+      AtomicInteger recordsWritten = new AtomicInteger(0);
+      AtomicInteger recordsCallback = new AtomicInteger(0);
+
+      int RECORDS = 500_000;
+      CountDownLatch done = new CountDownLatch(REGULAR_THREADS + TX_THREADS);
+
+      AtomicInteger errors = new AtomicInteger(0);
+
+      AtomicInteger sequence = new AtomicInteger(1);
+
+      for (int t = 0; t < REGULAR_THREADS; t++) {
+         executorService.execute(() -> {
+            try {
+               OperationContext context = new 
OperationContextImpl(orderedExecutorFactory.getExecutor());
+               for (int r = 0; r < RECORDS; r++) {
+                  String uuid = "noTX_ " + RandomUtil.randomUUIDString();
+                  pendingCallbacks.put(uuid, uuid);
+                  try {
+                     int add = sequence.incrementAndGet();
+                     journal.appendAddRecord(add, (byte) 0, new 
ByteArrayEncoding(new byte[5]), true, context);
+                     recordsWritten.incrementAndGet();
+                     context.executeOnCompletion(new IOCallback() {
+                        @Override
+                        public void done() {
+                           pendingCallbacks.remove(uuid);
+                           recordsCallback.incrementAndGet();
+                        }
+
+                        @Override
+                        public void onError(int errorCode, String 
errorMessage) {
+                           logger.warn("Error {}", errorCode);
+                           errors.incrementAndGet();
+                        }
+                     });
+                     journal.appendDeleteRecord(add, false);
+                  } catch (Throwable e) {
+                     logger.warn(e.getMessage(), e);
+                     errors.incrementAndGet();
+                  }
+               }
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      for (int t = 0; t < TX_THREADS; t++) {
+         executorService.execute(() -> {
+            try {
+               OperationContext context = new 
OperationContextImpl(orderedExecutorFactory.getExecutor());
+               for (int r = 0; r < RECORDS; r++) {
+                  String uuid = "tx_" + RandomUtil.randomUUIDString();
+                  try {
+                     long tx = sequence.incrementAndGet();
+                     pendingCallbacks.put(uuid, String.valueOf(tx));
+                     int add1 = sequence.incrementAndGet();
+                     int add2 = sequence.incrementAndGet();
+                     journal.appendAddRecordTransactional(tx, add1, (byte) 0, 
EncoderPersister.getInstance(), new ByteArrayEncoding(new byte[5]));
+                     journal.appendAddRecordTransactional(tx, add2, (byte) 0, 
EncoderPersister.getInstance(), new ByteArrayEncoding(new byte[5]));
+                     journal.appendCommitRecord(tx, true, context);
+                     recordsWritten.incrementAndGet();
+                     context.executeOnCompletion(new IOCallback() {
+                        @Override
+                        public void done() {
+                           pendingCallbacks.remove(uuid);
+                           recordsCallback.incrementAndGet();
+                        }
+
+                        @Override
+                        public void onError(int errorCode, String 
errorMessage) {
+                        }
+                     });
+                     journal.appendDeleteRecord(add1, false);
+                     journal.appendDeleteRecord(add2, false);
+                  } catch (Throwable e) {
+                     logger.warn(e.getMessage(), e);
+                     errors.incrementAndGet();
+                  }
+               }
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      int countRepeat = 0;
+      int missingData = 0;
+
+      while (!done.await(10, TimeUnit.MILLISECONDS) || 
!pendingCallbacks.isEmpty()) {
+         logger.debug("forcing situation, current written records = {}, with 
results = {}, callbacks={}", recordsWritten.get(), pendingCallbacks.size(), 
recordsCallback.get());
+         if (countRepeat++ < 10) { // compact a few times
+            journal.scheduleCompactAndBlock(500_000);
+         }
+         // we will keep forcing this method (which will move to a next file)
+         // to introduce possible races
+         journal.forceBackup(1, TimeUnit.SECONDS);
+
+         // If the issue was happening, this would print the IDs that are 
missing
+         if (pendingCallbacks.size() < 10 && !pendingCallbacks.isEmpty()  && 
done.getCount() == 0) {
+            if (missingData++ > 5) {
+               // lets give a chance for the test to finish, otherwise it 
would never finish
+               break;
+            }
+            pendingCallbacks.forEach((a, b) -> {
+               logger.info("ID {} with tx={} still in the list", a, b);
+            });
+         }
+      }
+
+      assertTrue(done.await(1, TimeUnit.MINUTES));
+      Wait.assertEquals(0, pendingCallbacks::size);
+      journal.stop();
+      factory.stop();

Review Comment:
   factory.stop() is also executed by `runAfter(factory::stop);`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 986905)
    Time Spent: 2h 10m  (was: 2h)

> Lingering sessions after a storage timeout
> ------------------------------------------
>
>                 Key: ARTEMIS-5694
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-5694
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.42.0
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.43.0
>
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> We had reports of sessions not closing for an unkown situation, probably on 
> storage.
> I am adding a timeout verification and adding extra logs in case this happens.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to