[ https://issues.apache.org/jira/browse/ARTEMIS-3464?focusedWorklogId=648105&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-648105 ]
ASF GitHub Bot logged work on ARTEMIS-3464: ------------------------------------------- Author: ASF GitHub Bot Created on: 08/Sep/21 17:35 Start Date: 08/Sep/21 17:35 Worklog Time Spent: 10m Work Description: clebertsuconic commented on a change in pull request #3728: URL: https://github.com/apache/activemq-artemis/pull/3728#discussion_r704634791 ########## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java ########## @@ -112,6 +112,141 @@ public AtomicInteger getScheduledCleanupCount() { // Each CursorIterator will record their current PageReader in this map private final ConcurrentLongHashMap<PageReader> pageReaders = new ConcurrentLongHashMap<>(); + private final AtomicInteger scheduledScanCount = new AtomicInteger(0); + + private final LinkedList<PageScan> scanList = new LinkedList(); + + private static class PageScan { + final Comparable<PagedReference> scanFunction; + final Runnable found; + final Runnable notfound; + + public Comparable<PagedReference> getScanFunction() { + return scanFunction; + } + + public Runnable getFound() { + return found; + } + + public Runnable getNotfound() { + return notfound; + } + + PageScan(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound) { + this.scanFunction = scanFunction; + this.found = found; + this.notfound = notfound; + } + } + + @Override + public void addScanAck(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound) { + PageScan scan = new PageScan(scanFunction, found, notfound); + synchronized (scanList) { + scanList.add(scan); + } + } + + @Override + public void performScanAck() { + // we should only have a max of 2 scheduled tasks + // one that's might still be currently running, and another one lined up + // no need for more than that + if (scheduledScanCount.incrementAndGet() < 2) { + executor.execute(this::actualScanAck); + } else { + scheduledScanCount.decrementAndGet(); + } + } + + private void actualScanAck() { + try { + PageScan[] localScanList; + synchronized (scanList) { + if (scanList.size() == 0) { + return; + } + localScanList = scanList.stream().toArray(i -> new PageScan[i]); + scanList.clear(); + } + + LinkedList<Runnable> afterCommitList = new LinkedList<>(); + TransactionImpl tx = new TransactionImpl(store); + tx.addOperation(new TransactionOperationAbstract() { + @Override + public void afterCommit(Transaction tx) { + for (Runnable r : afterCommitList) { + try { + r.run(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } + } + } + }); + PageIterator iterator = this.iterator(true); + boolean hasNext = iterator.hasNext(); + System.out.println("hasNext = " + hasNext); Review comment: note to myself: removing system.out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 648105) Remaining Estimate: 0h Time Spent: 10m > Mirror could Miss Acks with Paging > ---------------------------------- > > Key: ARTEMIS-3464 > URL: https://issues.apache.org/jira/browse/ARTEMIS-3464 > Project: ActiveMQ Artemis > Issue Type: Bug > Components: AMQP > Affects Versions: 2.18.0 > Reporter: Clebert Suconic > Assignee: Clebert Suconic > Priority: Major > Fix For: 2.19.0 > > Time Spent: 10m > Remaining Estimate: 0h > > The Mirror target could miss acks in some cases with Paging. > We should add a scan for when the message couldn't be reached within depaged > messages. -- This message was sent by Atlassian Jira (v8.3.4#803005)