This is an automated email from the ASF dual-hosted git repository.

ggrzybek pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.x by this push:
     new 6f2348c  [CAMEL-13951] Implement PostgresAggregationRepository to 
handle special PostgreSQL behavior
6f2348c is described below

commit 6f2348cf09a14ba4863e2364289a151da865fbcd
Author: Grzegorz Grzybek <gr.grzy...@gmail.com>
AuthorDate: Mon Sep 9 14:58:20 2019 +0200

    [CAMEL-13951] Implement PostgresAggregationRepository to handle special 
PostgreSQL behavior
    
    (cherry picked from commit 6974f9b60a504eb967b5e643254c441040df7f9c)
---
 .../camel-sql/src/main/docs/sql-component.adoc     | 48 +++++++++++-
 .../aggregate/jdbc/JdbcAggregationRepository.java  | 19 +++--
 .../jdbc/PostgresAggregationRepository.java        | 91 ++++++++++++++++++++++
 3 files changed, 151 insertions(+), 7 deletions(-)

diff --git a/components/camel-sql/src/main/docs/sql-component.adoc 
b/components/camel-sql/src/main/docs/sql-component.adoc
index d9b32c9..a011fe4 100644
--- a/components/camel-sql/src/main/docs/sql-component.adoc
+++ b/components/camel-sql/src/main/docs/sql-component.adoc
@@ -785,9 +785,9 @@ JDBC vendor.
 <bean id="repo"
 class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
   <property name="transactionManager" ref="transactionManager"/>
-  <propertyname="repositoryName" value="aggregation"/>
+  <property name="repositoryName" value="aggregation"/>
   <property name="dataSource" ref="dataSource"/>
-  <property name"jdbcOptimisticLockingExceptionMapper" 
ref="myExceptionMapper"/>
+  <property name="jdbcOptimisticLockingExceptionMapper" 
ref="myExceptionMapper"/>
 </bean>
 <!-- use the default mapper with extraFQN class names from our JDBC driver -->
 <bean id="myExceptionMapper" 
class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper">
@@ -800,6 +800,50 @@ 
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
 </bean>
 -----
 
+=== Propagation behavior
+
+`JdbcAggregationRepository` uses two distinct _transaction templates_ from 
Spring-TX. One is read-only
+and one is used for read-write operations.
+
+However, when using `JdbcAggregationRepository` within a route that itself 
uses `<transacted />` and there's
+common `PlatformTransactionManager` used, there may be a need to configure 
_propagation behavior_ used by
+transaction templates inside `JdbcAggregationRepository`.
+
+Here's a way to do it:
+[source,xml]
+----
+<bean id="repo"
+class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
+  <property name="propagationBehaviorName" value="PROPAGATION_NESTED" />
+</bean>
+----
+
+Propagation is specified by constants of 
`org.springframework.transaction.TransactionDefinition` interface,
+so `propagationBehaviorName` is convenient setter that allows to use names of 
the constants.
+
+=== PostgreSQL case
+
+There's special database that may cause problems with optimistic locking used 
by `JdbcAggregationRepository`.
+PostgreSQL marks connection as invalid in case of data integrity violation 
exception (the one with SQLState 23505).
+This makes the connection effectively unusable within nested transaction.
+Details can be found
+https://www.postgresql.org/message-id/200609241203.59292.ralf.wiebicke%40exedio.com[in
 this document].
+
+`org.apache.camel.processor.aggregate.jdbc.PostgresAggregationRepository` 
extends `JdbcAggregationRepository` and
+uses special `INSERT .. ON CONFLICT ..` statement to provide optimistic 
locking behavior.
+
+This statement is (with default aggregation table definition):
+[source,sql]
+----
+INSERT INTO aggregation (id, exchange) values (?, ?) ON CONFLICT DO NOTHING
+----
+
+Details can be found https://www.postgresql.org/docs/9.5/sql-insert.html[in 
PostgreSQL documentation].
+
+When this clause is used, `java.sql.PreparedStatement.executeUpdate()` call 
returns `0` instead of throwing
+SQLException with SQLState=23505. Further handling is exactly the same as with 
generic `JdbcAggregationRepository`,
+but without marking PostgreSQL connection as invalid.
+
 == Camel Sql Starter
 
 A starter module is available to spring-boot users. When using the starter,
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
index 597767a..4e93389 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
@@ -59,11 +59,11 @@ import 
org.springframework.transaction.support.TransactionTemplate;
  */
 public class JdbcAggregationRepository extends ServiceSupport implements 
RecoverableAggregationRepository, OptimisticLockingAggregationRepository {
 
+    protected static final String EXCHANGE = "exchange";
+    protected static final String ID = "id";
+    protected static final String BODY = "body";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcAggregationRepository.class);
-    private static final String ID = "id";
-    private static final String EXCHANGE = "exchange";
-    private static final String BODY = "body";
     private static final Constants PROPAGATION_CONSTANTS = new 
Constants(TransactionDefinition.class);
 
     private JdbcOptimisticLockingExceptionMapper 
jdbcOptimisticLockingExceptionMapper = new 
DefaultJdbcOptimisticLockingExceptionMapper();
@@ -240,9 +240,9 @@ public class JdbcAggregationRepository extends 
ServiceSupport implements Recover
         insertAndUpdateHelper(camelContext, correlationId, exchange, sql, 
true);
     }
 
-    protected void insertAndUpdateHelper(final CamelContext camelContext, 
final String key, final Exchange exchange, String sql, final boolean 
idComesFirst) throws Exception {
+    protected int insertAndUpdateHelper(final CamelContext camelContext, final 
String key, final Exchange exchange, String sql, final boolean idComesFirst) 
throws Exception {
         final byte[] data = codec.marshallExchange(camelContext, exchange, 
allowSerializedHeaders);
-        jdbcTemplate.execute(sql,
+        Integer updateCount = jdbcTemplate.execute(sql,
                 new 
AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
                     @Override
                     protected void setValues(PreparedStatement ps, LobCreator 
lobCreator) throws SQLException {
@@ -265,6 +265,7 @@ public class JdbcAggregationRepository extends 
ServiceSupport implements Recover
                         }
                     }
                 });
+        return updateCount == null ? 0 : updateCount;
     }
 
     @Override
@@ -443,6 +444,10 @@ public class JdbcAggregationRepository extends 
ServiceSupport implements Recover
         return this.headersToStoreAsText != null && 
!this.headersToStoreAsText.isEmpty();
     }
 
+    public List<String> getHeadersToStoreAsText() {
+        return headersToStoreAsText;
+    }
+
     /**
      * Allows to store headers as String which is human readable. By default 
this option is disabled,
      * storing the headers in binary format.
@@ -453,6 +458,10 @@ public class JdbcAggregationRepository extends 
ServiceSupport implements Recover
         this.headersToStoreAsText = headersToStoreAsText;
     }
 
+    public boolean isStoreBodyAsText() {
+        return storeBodyAsText;
+    }
+
     /**
      * Whether to store the message body as String which is human readable.
      * By default this option is false storing the body in binary format.
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
new file mode 100644
index 0000000..f023432
--- /dev/null
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.springframework.dao.DataIntegrityViolationException;
+import org.springframework.transaction.PlatformTransactionManager;
+
+/**
+ * PostgreSQL specific {@link JdbcAggregationRepository} that deals with SQL 
Violation Exceptions
+ * using special {@code INSERT INTO .. ON CONFLICT DO NOTHING} claues.
+ */
+public class PostgresAggregationRepository extends JdbcAggregationRepository {
+
+    /**
+     * Creates an aggregation repository
+     */
+    public PostgresAggregationRepository() {
+    }
+
+    /**
+     * Creates an aggregation repository with the three mandatory parameters
+     */
+    public PostgresAggregationRepository(PlatformTransactionManager 
transactionManager, String repositoryName, DataSource dataSource) {
+        super(transactionManager, repositoryName, dataSource);
+    }
+
+    /**
+     * Inserts a new record into the given repository table
+     *
+     * @param camelContext   the current CamelContext
+     * @param correlationId  the correlation key
+     * @param exchange       the aggregated exchange
+     * @param repositoryName The name of the table
+     */
+    protected void insert(final CamelContext camelContext, final String 
correlationId, final Exchange exchange, String repositoryName) throws Exception 
{
+        // The default totalParameterIndex is 2 for ID and Exchange. Depending 
on logic this will be increased
+        int totalParameterIndex = 2;
+        StringBuilder queryBuilder = new StringBuilder()
+                .append("INSERT INTO ").append(repositoryName)
+                .append('(')
+                .append(EXCHANGE).append(", ")
+                .append(ID);
+
+        if (isStoreBodyAsText()) {
+            queryBuilder.append(", ").append(BODY);
+            totalParameterIndex++;
+        }
+
+        if (hasHeadersToStoreAsText()) {
+            for (String headerName : getHeadersToStoreAsText()) {
+                queryBuilder.append(", ").append(headerName);
+                totalParameterIndex++;
+            }
+        }
+
+        queryBuilder.append(") VALUES (");
+
+        for (int i = 0; i < totalParameterIndex - 1; i++) {
+            queryBuilder.append("?, ");
+        }
+        queryBuilder.append("?)");
+
+        queryBuilder.append(" ON CONFLICT DO NOTHING");
+
+        String sql = queryBuilder.toString();
+
+        int updateCount = insertAndUpdateHelper(camelContext, correlationId, 
exchange, sql, true);
+        if (updateCount == 0 && getRepositoryName().equals(repositoryName)) {
+            throw new DataIntegrityViolationException("No row was inserted due 
to data violation");
+        }
+    }
+
+}

Reply via email to