This is an automated email from the ASF dual-hosted git repository. klease pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 993689663de CAMEL-17613: Race condition in AggregateProcessor with Jdbc Repository (#7494) 993689663de is described below commit 993689663deb0e394e066659db9c0fb4817465b8 Author: klease <38634989+kle...@users.noreply.github.com> AuthorDate: Tue Apr 26 09:18:34 2022 +0200 CAMEL-17613: Race condition in AggregateProcessor with Jdbc Repository (#7494) Add a new method to RecoverableAggregateRepository which returns a boolean to indicate if the exchange was successfully removed from the completed repository. If the remove failed, retry it in the RecoverTask, but do not redeliver the exchange. Add a unit test (provided by Benjamin Bonnet) to simulate this condition. --- components/camel-sql/pom.xml | 6 + .../aggregate/jdbc/JdbcAggregationRepository.java | 13 +- .../jdbc/JdbcRemoveConfirmOrderAggregateTest.java | 131 +++++++++++++++++++++ .../aggregate/jdbc/JdbcSpringDataSource.xml | 24 ++++ .../spi/RecoverableAggregationRepository.java | 15 +++ .../processor/aggregate/AggregateProcessor.java | 38 +++++- 6 files changed, 221 insertions(+), 6 deletions(-) diff --git a/components/camel-sql/pom.xml b/components/camel-sql/pom.xml index 9cd35aaba0f..ea5174a030f 100644 --- a/components/camel-sql/pom.xml +++ b/components/camel-sql/pom.xml @@ -87,6 +87,12 @@ <artifactId>awaitility</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <version>2.1.210</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java index 4e723785106..bc99f2deab8 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java +++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java @@ -388,6 +388,7 @@ public class JdbcAggregationRepository extends ServiceSupport key, version); insert(camelContext, confirmKey, exchange, getRepositoryNameCompleted(), version); + LOG.debug("Removed key {}", key); } catch (Exception e) { throw new RuntimeException("Error removing key " + key + " from repository " + repositoryName, e); @@ -398,8 +399,13 @@ public class JdbcAggregationRepository extends ServiceSupport @Override public void confirm(final CamelContext camelContext, final String exchangeId) { - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { + confirmWithResult(camelContext, exchangeId); + } + + @Override + public boolean confirmWithResult(final CamelContext camelContext, final String exchangeId) { + return transactionTemplate.execute(new TransactionCallback<Boolean>() { + public Boolean doInTransaction(TransactionStatus status) { LOG.debug("Confirming exchangeId {}", exchangeId); final String confirmKey = exchangeId; final int mustBeOne = jdbcTemplate @@ -407,8 +413,9 @@ public class JdbcAggregationRepository extends ServiceSupport if (mustBeOne != 1) { LOG.error("problem removing row " + confirmKey + " from " + getRepositoryNameCompleted() + " - DELETE statement did not return 1 but " + mustBeOne); + return false; } - + return true; } }); } diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcRemoveConfirmOrderAggregateTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcRemoveConfirmOrderAggregateTest.java new file mode 100644 index 00000000000..8c93ae79627 --- /dev/null +++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcRemoveConfirmOrderAggregateTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.jdbc; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +import javax.sql.DataSource; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.transaction.support.DefaultTransactionStatus; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.fail; + +public class JdbcRemoveConfirmOrderAggregateTest extends AbstractJdbcAggregationTestSupport { + + public static class SlowCommitDataSourceTransactionManager extends DataSourceTransactionManager { + int count; + + @Override + protected void doCommit(DefaultTransactionStatus status) { + if ("main".equals(Thread.currentThread().getName()) && ++count == 2) { + try { + LOG.debug("sleeping while committing..."); + Thread.sleep(300); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + super.doCommit(status); + } + } + + public static class MyAggregationStrategyWithDelay extends MyAggregationStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + try { + // The recovery thread has an initial delay of 1 sec + LOG.debug("Delaying during aggregate"); + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return super.aggregate(oldExchange, newExchange); + } + + @Override + public void onCompletion(Exchange oldExchange) { + JdbcRemoveConfirmOrderAggregateTest.completedExchangeCount++; + } + } + + static int completedExchangeCount; + private static final Logger LOG = LoggerFactory.getLogger(JdbcRemoveConfirmOrderAggregateTest.class.getName()); + + @Override + void configureJdbcAggregationRepository() { + repo = applicationContext.getBean("repoSlowCommit", JdbcAggregationRepository.class); + // enable recovery + repo.setUseRecovery(true); + // check faster + repo.setRecoveryInterval(400, TimeUnit.MILLISECONDS); + } + + @Test + public void testJdbcAggregateRecover() throws Exception { + + getMockEndpoint("mock:result").expectedBodiesReceived("AB"); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + assertMockEndpointsSatisfied(10, TimeUnit.SECONDS); + // Wait until the recovery has been run + await().atMost(500, TimeUnit.MILLISECONDS).until(this::checkCompletedNotPresent); + Assertions.assertEquals(1, JdbcRemoveConfirmOrderAggregateTest.completedExchangeCount, + "There should be only 1 completed aggregation"); + } + + private boolean checkCompletedNotPresent() { + DataSource datasource = applicationContext.getBean("JdbcRemoveConfirmOrderAggregateTest-dataSourceSlow", + DataSource.class); + try { + Connection connection = datasource.getConnection(); + ResultSet rs = connection.createStatement() + .executeQuery("SELECT * FROM aggregationRepo1_completed"); + return !rs.next(); + } catch (SQLException e) { + fail(e); + return false; + } + + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start"). + threads(2). + transacted("required").aggregate(header("id"), new MyAggregationStrategyWithDelay()).completionSize(2).aggregationRepository(repo) + .optimisticLocking().to("mock:result").end(); + } + }; + } +} diff --git a/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml b/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml index 45648cb28b4..cde76936a00 100644 --- a/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml +++ b/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml @@ -60,6 +60,12 @@ <jdbc:embedded-database id="{{testClassSimpleName}}-dataSource5" type="DERBY"> <jdbc:script location="classpath:/sql/init5.sql"/> </jdbc:embedded-database> + + <!-- In Memory Database #6 --> + <jdbc:embedded-database id="{{testClassSimpleName}}-dataSourceSlow" type="H2"> + <jdbc:script location="classpath:/sql/init.sql"/> + </jdbc:embedded-database> + <bean id="repo1" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="repositoryName" value="aggregationRepo1"/> @@ -113,6 +119,12 @@ <property name="transactionManager" ref="txManager5"/> <property name="dataSource" ref="{{testClassSimpleName}}-dataSource5"/> </bean> + + <bean id="repoSlowCommit" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> + <property name="repositoryName" value="aggregationRepo1"/> + <property name="transactionManager" ref="txManagerSlow"/> + <property name="dataSource" ref="{{testClassSimpleName}}-dataSourceSlow"/> + </bean> <bean id="txManager1" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="{{testClassSimpleName}}-dataSource1"/> @@ -133,5 +145,17 @@ <bean id="txManager5" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="{{testClassSimpleName}}-dataSource5"/> </bean> + + <bean id="txManagerSlow" class="org.apache.camel.processor.aggregate.jdbc.JdbcRemoveConfirmOrderAggregateTest$SlowCommitDataSourceTransactionManager"> + <property name="dataSource" ref="{{testClassSimpleName}}-dataSourceSlow"/> + </bean> + + <bean id="required" + class="org.apache.camel.spring.spi.SpringTransactionPolicy"> + <property name="transactionManager" + ref="txManagerSlow" /> + <property name="propagationBehaviorName" + value="PROPAGATION_REQUIRED" /> + </bean> </beans> diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java b/core/camel-api/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java index 0b07a5cba80..8829d03fbdc 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java @@ -117,4 +117,19 @@ public interface RecoverableAggregationRepository extends AggregationRepository */ int getMaximumRedeliveries(); + /** + * Confirms the completion of the {@link Exchange} with a result. + * <p/> + * This method is invoked instead of confirm() if the repository is recoverable. This allows possible recovery of + * non-confirmed completed exchanges. + * + * @param camelContext the current CamelContext + * @param exchangeId exchange id to confirm + * @return true if the exchange was successfully removed, else false. + */ + default boolean confirmWithResult(CamelContext camelContext, String exchangeId) { + confirm(camelContext, exchangeId); + return true; + } + } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index d579ae68d7c..8b971bd0c77 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -125,6 +125,7 @@ public class AggregateProcessor extends AsyncProcessorSupport private Map<String, String> closedCorrelationKeys; private final Set<String> batchConsumerCorrelationKeys = new ConcurrentSkipListSet<>(); private final Set<String> inProgressCompleteExchanges = ConcurrentHashMap.newKeySet(); + private final Set<String> unconfirmedCompleteExchanges = ConcurrentHashMap.newKeySet(); private final Set<String> inProgressCompleteExchangesForRecoveryTask = ConcurrentHashMap.newKeySet(); private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<>(); @@ -248,6 +249,7 @@ public class AggregateProcessor extends AsyncProcessorSupport private long completionTimeoutCheckerInterval = 1000; private ProducerTemplate deadLetterProducerTemplate; + private boolean isRecoverableRepository; public AggregateProcessor(CamelContext camelContext, AsyncProcessor processor, Expression correlationExpression, AggregationStrategy aggregationStrategy, @@ -580,7 +582,7 @@ public class AggregateProcessor extends AsyncProcessorSupport } // special for some repository implementations - if (aggregationRepository instanceof RecoverableAggregationRepository) { + if (isRecoverableRepository()) { boolean valid = oldExchange == null || answer.getExchangeId().equals(oldExchange.getExchangeId()); if (!valid && aggregateRepositoryWarned.compareAndSet(false, true)) { LOG.warn( @@ -1100,6 +1102,11 @@ public class AggregateProcessor extends AsyncProcessorSupport public void setAggregationRepository(AggregationRepository aggregationRepository) { this.aggregationRepository = aggregationRepository; + this.isRecoverableRepository = aggregationRepository instanceof RecoverableAggregationRepository; + } + + private boolean isRecoverableRepository() { + return isRecoverableRepository; } public boolean isDiscardOnCompletionTimeout() { @@ -1216,9 +1223,19 @@ public class AggregateProcessor extends AsyncProcessorSupport // only confirm if we processed without a problem try { - aggregationRepository.confirm(exchange.getContext(), exchangeId); + boolean confirmed; + if (isRecoverableRepository()) { + confirmed = ((RecoverableAggregationRepository) aggregationRepository) + .confirmWithResult(exchange.getContext(), exchangeId); + } else { + aggregationRepository.confirm(exchange.getContext(), exchangeId); + confirmed = true; + } // and remove redelivery state as well redeliveryState.remove(exchangeId); + if (!confirmed) { + unconfirmedCompleteExchanges.add(exchangeId); + } } finally { // must remember to remove in progress when we are complete inProgressCompleteExchanges.remove(exchangeId); @@ -1373,6 +1390,8 @@ public class AggregateProcessor extends AsyncProcessorSupport recoveryInProgress.set(true); inProgressCompleteExchangesForRecoveryTask.clear(); inProgressCompleteExchangesForRecoveryTask.addAll(inProgressCompleteExchanges); + // These are delivered but still in complete repository! + inProgressCompleteExchangesForRecoveryTask.addAll(unconfirmedCompleteExchanges); final Set<String> exchangeIds = recoverable.scan(camelContext); for (String exchangeId : exchangeIds) { @@ -1388,6 +1407,9 @@ public class AggregateProcessor extends AsyncProcessorSupport final boolean inProgress = inProgressCompleteExchangesForRecoveryTask.contains(exchangeId); if (inProgress) { LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId); + if (unconfirmedCompleteExchanges.contains(exchangeId)) { + retryConfirm(exchangeId); + } } else { LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId); Exchange exchange = recoverable.recover(camelContext, exchangeId); @@ -1463,6 +1485,16 @@ public class AggregateProcessor extends AsyncProcessorSupport } LOG.trace("Recover check complete"); } + + private void retryConfirm(String exchangeId) { + // Confirm that the exchange was processed + if (recoverable.confirmWithResult(camelContext, exchangeId)) { + unconfirmedCompleteExchanges.remove(exchangeId); + LOG.debug("Removal of exchange {} confirmed.", exchangeId); + } else { + LOG.warn("Still unable to confirm removal of exchange {}.", exchangeId); + } + } } @Override @@ -1513,7 +1545,7 @@ public class AggregateProcessor extends AsyncProcessorSupport ServiceHelper.startService(aggregationStrategy, processor, aggregationRepository); // should we use recover checker - if (aggregationRepository instanceof RecoverableAggregationRepository) { + if (isRecoverableRepository()) { RecoverableAggregationRepository recoverable = (RecoverableAggregationRepository) aggregationRepository; if (recoverable.isUseRecovery()) { long interval = recoverable.getRecoveryIntervalInMillis();