This is an automated email from the ASF dual-hosted git repository.

klease pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 993689663de CAMEL-17613: Race condition in AggregateProcessor with 
Jdbc Repository (#7494)
993689663de is described below

commit 993689663deb0e394e066659db9c0fb4817465b8
Author: klease <38634989+kle...@users.noreply.github.com>
AuthorDate: Tue Apr 26 09:18:34 2022 +0200

    CAMEL-17613: Race condition in AggregateProcessor with Jdbc Repository 
(#7494)
    
    Add a new method to RecoverableAggregateRepository which returns a boolean
    to indicate if the exchange was successfully removed from the completed
    repository. If the remove failed, retry it in the RecoverTask, but do not
    redeliver the exchange.
    Add a unit test (provided by Benjamin Bonnet) to simulate this condition.
---
 components/camel-sql/pom.xml                       |   6 +
 .../aggregate/jdbc/JdbcAggregationRepository.java  |  13 +-
 .../jdbc/JdbcRemoveConfirmOrderAggregateTest.java  | 131 +++++++++++++++++++++
 .../aggregate/jdbc/JdbcSpringDataSource.xml        |  24 ++++
 .../spi/RecoverableAggregationRepository.java      |  15 +++
 .../processor/aggregate/AggregateProcessor.java    |  38 +++++-
 6 files changed, 221 insertions(+), 6 deletions(-)

diff --git a/components/camel-sql/pom.xml b/components/camel-sql/pom.xml
index 9cd35aaba0f..ea5174a030f 100644
--- a/components/camel-sql/pom.xml
+++ b/components/camel-sql/pom.xml
@@ -87,6 +87,12 @@
             <artifactId>awaitility</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <version>2.1.210</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
index 4e723785106..bc99f2deab8 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
@@ -388,6 +388,7 @@ public class JdbcAggregationRepository extends 
ServiceSupport
                             key, version);
 
                     insert(camelContext, confirmKey, exchange, 
getRepositoryNameCompleted(), version);
+                    LOG.debug("Removed key {}", key);
 
                 } catch (Exception e) {
                     throw new RuntimeException("Error removing key " + key + " 
from repository " + repositoryName, e);
@@ -398,8 +399,13 @@ public class JdbcAggregationRepository extends 
ServiceSupport
 
     @Override
     public void confirm(final CamelContext camelContext, final String 
exchangeId) {
-        transactionTemplate.execute(new TransactionCallbackWithoutResult() {
-            protected void doInTransactionWithoutResult(TransactionStatus 
status) {
+        confirmWithResult(camelContext, exchangeId);
+    }
+
+    @Override
+    public boolean confirmWithResult(final CamelContext camelContext, final 
String exchangeId) {
+        return transactionTemplate.execute(new TransactionCallback<Boolean>() {
+            public Boolean doInTransaction(TransactionStatus status) {
                 LOG.debug("Confirming exchangeId {}", exchangeId);
                 final String confirmKey = exchangeId;
                 final int mustBeOne = jdbcTemplate
@@ -407,8 +413,9 @@ public class JdbcAggregationRepository extends 
ServiceSupport
                 if (mustBeOne != 1) {
                     LOG.error("problem removing row " + confirmKey + " from " 
+ getRepositoryNameCompleted()
                               + " - DELETE statement did not return 1 but " + 
mustBeOne);
+                    return false;
                 }
-
+                return true;
             }
         });
     }
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcRemoveConfirmOrderAggregateTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcRemoveConfirmOrderAggregateTest.java
new file mode 100644
index 00000000000..8c93ae79627
--- /dev/null
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcRemoveConfirmOrderAggregateTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.transaction.support.DefaultTransactionStatus;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class JdbcRemoveConfirmOrderAggregateTest extends 
AbstractJdbcAggregationTestSupport {
+
+    public static class SlowCommitDataSourceTransactionManager extends 
DataSourceTransactionManager {
+        int count;
+
+        @Override
+        protected void doCommit(DefaultTransactionStatus status) {
+            if ("main".equals(Thread.currentThread().getName()) && ++count == 
2) {
+                try {
+                    LOG.debug("sleeping while committing...");
+                    Thread.sleep(300);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+            super.doCommit(status);
+        }
+    }
+
+    public static class MyAggregationStrategyWithDelay extends 
MyAggregationStrategy implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            try {
+                // The recovery thread has an initial delay of 1 sec
+                LOG.debug("Delaying during aggregate");
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            return super.aggregate(oldExchange, newExchange);
+        }
+
+        @Override
+        public void onCompletion(Exchange oldExchange) {
+            JdbcRemoveConfirmOrderAggregateTest.completedExchangeCount++;
+        }
+    }
+
+    static int completedExchangeCount;
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcRemoveConfirmOrderAggregateTest.class.getName());
+
+    @Override
+    void configureJdbcAggregationRepository() {
+        repo = applicationContext.getBean("repoSlowCommit", 
JdbcAggregationRepository.class);
+        // enable recovery
+        repo.setUseRecovery(true);
+        // check faster
+        repo.setRecoveryInterval(400, TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void testJdbcAggregateRecover() throws Exception {
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("AB");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
+        // Wait until the recovery has been run
+        await().atMost(500, 
TimeUnit.MILLISECONDS).until(this::checkCompletedNotPresent);
+        Assertions.assertEquals(1, 
JdbcRemoveConfirmOrderAggregateTest.completedExchangeCount,
+                "There should be only 1 completed aggregation");
+    }
+
+    private boolean checkCompletedNotPresent() {
+        DataSource datasource = 
applicationContext.getBean("JdbcRemoveConfirmOrderAggregateTest-dataSourceSlow",
+                DataSource.class);
+        try {
+            Connection connection = datasource.getConnection();
+            ResultSet rs = connection.createStatement()
+                    .executeQuery("SELECT * FROM aggregationRepo1_completed");
+            return !rs.next();
+        } catch (SQLException e) {
+            fail(e);
+            return false;
+        }
+
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").
+                threads(2).
+                transacted("required").aggregate(header("id"), new 
MyAggregationStrategyWithDelay()).completionSize(2).aggregationRepository(repo)
+                    .optimisticLocking().to("mock:result").end();
+            }
+        };
+    }
+}
diff --git 
a/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
 
b/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
index 45648cb28b4..cde76936a00 100644
--- 
a/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
+++ 
b/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
@@ -60,6 +60,12 @@
     <jdbc:embedded-database id="{{testClassSimpleName}}-dataSource5" 
type="DERBY">
         <jdbc:script location="classpath:/sql/init5.sql"/>
     </jdbc:embedded-database>
+    
+    <!-- In Memory Database #6 -->
+    <jdbc:embedded-database id="{{testClassSimpleName}}-dataSourceSlow" 
type="H2">
+        <jdbc:script location="classpath:/sql/init.sql"/>
+    </jdbc:embedded-database>
+    
 
     <bean id="repo1" 
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
         <property name="repositoryName" value="aggregationRepo1"/>
@@ -113,6 +119,12 @@
         <property name="transactionManager" ref="txManager5"/>
         <property name="dataSource" ref="{{testClassSimpleName}}-dataSource5"/>
     </bean>
+    
+     <bean id="repoSlowCommit" 
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
+        <property name="repositoryName" value="aggregationRepo1"/>
+        <property name="transactionManager" ref="txManagerSlow"/>
+        <property name="dataSource" 
ref="{{testClassSimpleName}}-dataSourceSlow"/>
+    </bean>    
 
     <bean id="txManager1" 
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
         <property name="dataSource" ref="{{testClassSimpleName}}-dataSource1"/>
@@ -133,5 +145,17 @@
     <bean id="txManager5" 
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
         <property name="dataSource" ref="{{testClassSimpleName}}-dataSource5"/>
     </bean>
+    
+   <bean id="txManagerSlow" 
class="org.apache.camel.processor.aggregate.jdbc.JdbcRemoveConfirmOrderAggregateTest$SlowCommitDataSourceTransactionManager">
+        <property name="dataSource" 
ref="{{testClassSimpleName}}-dataSourceSlow"/>
+    </bean>
+
+    <bean id="required"
+        class="org.apache.camel.spring.spi.SpringTransactionPolicy">
+        <property name="transactionManager"
+            ref="txManagerSlow" />
+        <property name="propagationBehaviorName"
+            value="PROPAGATION_REQUIRED" />
+    </bean>
 
 </beans>
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
 
b/core/camel-api/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
index 0b07a5cba80..8829d03fbdc 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
@@ -117,4 +117,19 @@ public interface RecoverableAggregationRepository extends 
AggregationRepository
      */
     int getMaximumRedeliveries();
 
+    /**
+     * Confirms the completion of the {@link Exchange} with a result.
+     * <p/>
+     * This method is invoked instead of confirm() if the repository is 
recoverable. This allows possible recovery of
+     * non-confirmed completed exchanges.
+     *
+     * @param  camelContext the current CamelContext
+     * @param  exchangeId   exchange id to confirm
+     * @return              true if the exchange was successfully removed, 
else false.
+     */
+    default boolean confirmWithResult(CamelContext camelContext, String 
exchangeId) {
+        confirm(camelContext, exchangeId);
+        return true;
+    }
+
 }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index d579ae68d7c..8b971bd0c77 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -125,6 +125,7 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
     private Map<String, String> closedCorrelationKeys;
     private final Set<String> batchConsumerCorrelationKeys = new 
ConcurrentSkipListSet<>();
     private final Set<String> inProgressCompleteExchanges = 
ConcurrentHashMap.newKeySet();
+    private final Set<String> unconfirmedCompleteExchanges = 
ConcurrentHashMap.newKeySet();
     private final Set<String> inProgressCompleteExchangesForRecoveryTask = 
ConcurrentHashMap.newKeySet();
     private final Map<String, RedeliveryData> redeliveryState = new 
ConcurrentHashMap<>();
 
@@ -248,6 +249,7 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
     private long completionTimeoutCheckerInterval = 1000;
 
     private ProducerTemplate deadLetterProducerTemplate;
+    private boolean isRecoverableRepository;
 
     public AggregateProcessor(CamelContext camelContext, AsyncProcessor 
processor,
                               Expression correlationExpression, 
AggregationStrategy aggregationStrategy,
@@ -580,7 +582,7 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
         }
 
         // special for some repository implementations
-        if (aggregationRepository instanceof RecoverableAggregationRepository) 
{
+        if (isRecoverableRepository()) {
             boolean valid = oldExchange == null || 
answer.getExchangeId().equals(oldExchange.getExchangeId());
             if (!valid && aggregateRepositoryWarned.compareAndSet(false, 
true)) {
                 LOG.warn(
@@ -1100,6 +1102,11 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
 
     public void setAggregationRepository(AggregationRepository 
aggregationRepository) {
         this.aggregationRepository = aggregationRepository;
+        this.isRecoverableRepository = aggregationRepository instanceof 
RecoverableAggregationRepository;
+    }
+
+    private boolean isRecoverableRepository() {
+        return isRecoverableRepository;
     }
 
     public boolean isDiscardOnCompletionTimeout() {
@@ -1216,9 +1223,19 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
 
             // only confirm if we processed without a problem
             try {
-                aggregationRepository.confirm(exchange.getContext(), 
exchangeId);
+                boolean confirmed;
+                if (isRecoverableRepository()) {
+                    confirmed = ((RecoverableAggregationRepository) 
aggregationRepository)
+                            .confirmWithResult(exchange.getContext(), 
exchangeId);
+                } else {
+                    aggregationRepository.confirm(exchange.getContext(), 
exchangeId);
+                    confirmed = true;
+                }
                 // and remove redelivery state as well
                 redeliveryState.remove(exchangeId);
+                if (!confirmed) {
+                    unconfirmedCompleteExchanges.add(exchangeId);
+                }
             } finally {
                 // must remember to remove in progress when we are complete
                 inProgressCompleteExchanges.remove(exchangeId);
@@ -1373,6 +1390,8 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
                 recoveryInProgress.set(true);
                 inProgressCompleteExchangesForRecoveryTask.clear();
                 
inProgressCompleteExchangesForRecoveryTask.addAll(inProgressCompleteExchanges);
+                // These are delivered but still in complete repository!
+                
inProgressCompleteExchangesForRecoveryTask.addAll(unconfirmedCompleteExchanges);
                 final Set<String> exchangeIds = recoverable.scan(camelContext);
                 for (String exchangeId : exchangeIds) {
 
@@ -1388,6 +1407,9 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
                         final boolean inProgress = 
inProgressCompleteExchangesForRecoveryTask.contains(exchangeId);
                         if (inProgress) {
                             LOG.trace("Aggregated exchange with id: {} is 
already in progress.", exchangeId);
+                            if 
(unconfirmedCompleteExchanges.contains(exchangeId)) {
+                                retryConfirm(exchangeId);
+                            }
                         } else {
                             LOG.debug("Loading aggregated exchange with id: {} 
to be recovered.", exchangeId);
                             Exchange exchange = 
recoverable.recover(camelContext, exchangeId);
@@ -1463,6 +1485,16 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
             }
             LOG.trace("Recover check complete");
         }
+
+        private void retryConfirm(String exchangeId) {
+            // Confirm that the exchange was processed
+            if (recoverable.confirmWithResult(camelContext, exchangeId)) {
+                unconfirmedCompleteExchanges.remove(exchangeId);
+                LOG.debug("Removal of exchange {} confirmed.", exchangeId);
+            } else {
+                LOG.warn("Still unable to confirm removal of exchange {}.", 
exchangeId);
+            }
+        }
     }
 
     @Override
@@ -1513,7 +1545,7 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
         ServiceHelper.startService(aggregationStrategy, processor, 
aggregationRepository);
 
         // should we use recover checker
-        if (aggregationRepository instanceof RecoverableAggregationRepository) 
{
+        if (isRecoverableRepository()) {
             RecoverableAggregationRepository recoverable = 
(RecoverableAggregationRepository) aggregationRepository;
             if (recoverable.isUseRecovery()) {
                 long interval = recoverable.getRecoveryIntervalInMillis();

Reply via email to