This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch 7.1.x in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit c2e9c3367be1bd0682f7ca44cff864ed93c5dda5 Author: overmeulen <overmeu...@murex.com> AuthorDate: Mon Apr 29 13:59:43 2019 +0200 QPID-8305: [Broker-J][JDBC Message Store] Performance regression when increasing the number of queues linked to a topic (cherry picked from commit fccae34c184dd45b2a057d7bf6658b152c687e11) --- .../store/jdbc/AbstractJDBCMessageStore.java | 73 ++++++++++++---------- 1 file changed, 39 insertions(+), 34 deletions(-) diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java index de8fef2..7c65499 100644 --- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java +++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java @@ -31,7 +31,9 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -600,35 +602,34 @@ public abstract class AbstractJDBCMessageStore implements MessageStore return new JDBCTransaction(); } - private void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException + private void enqueueMessages(ConnectionWrapper connWrapper, Map<Long, List<TransactionLogResource>> queuesPerMessage) throws StoreException { Connection conn = connWrapper.getConnection(); + String sql = String.format("INSERT INTO %s (queue_id, message_id) values (?,?)", getQueueEntryTableName()); - - try + try (PreparedStatement stmt = conn.prepareStatement(sql)) { - if (getLogger().isDebugEnabled()) + for(Long messageId : queuesPerMessage.keySet()) { - getLogger().debug("Enqueuing message {} on queue {} with id {} [Connection {}]", - messageId, queue.getName(), queue.getId(), conn); - } - - try (PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + getQueueEntryTableName() - + " (queue_id, message_id) values (?,?)")) - { - stmt.setString(1, queue.getId().toString()); - stmt.setLong(2, messageId); - stmt.executeUpdate(); + for(TransactionLogResource queue : queuesPerMessage.get(messageId)) + { + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Enqueuing message {} on queue {} with id {} [Connection {}]", + messageId, queue.getName(), queue.getId(), conn); + } + stmt.setString(1, queue.getId().toString()); + stmt.setLong(2, messageId); + stmt.addBatch(); + } } - + stmt.executeBatch(); } catch (SQLException e) { - getLogger().error("Failed to enqueue message {}", messageId, e); - throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId() - + " to database", e); + getLogger().error("Failed to enqueue messages", e); + throw new StoreException("Error writing enqueued messages to database", e); } - } private void dequeueMessage(ConnectionWrapper connWrapper, final UUID queueId, @@ -1135,6 +1136,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore private int _storeSizeIncrease; private final List<Runnable> _preCommitActions = new ArrayList<>(); private final List<Runnable> _postCommitActions = new ArrayList<>(); + private final Map<Long, List<TransactionLogResource>> _messagesToEnqueue = new HashMap<>(); protected JDBCTransaction() { @@ -1156,25 +1158,27 @@ public abstract class AbstractJDBCMessageStore implements MessageStore final StoredMessage storedMessage = message.getStoredMessage(); if(storedMessage instanceof StoredJDBCMessage) { - _preCommitActions.add(new Runnable() - { - @Override - public void run() + _preCommitActions.add(() -> { + try { - try - { - ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection()); - _storeSizeIncrease += storedMessage.getContentSize(); - } - catch (SQLException e) - { - throw new StoreException("Exception on enqueuing message into message store" + _messageId, - e); - } + ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection()); + _storeSizeIncrease += storedMessage.getContentSize(); } + catch (SQLException e) + { + throw new StoreException("Exception on enqueuing message into message store" + _messageId, e); + } + }); + } + if(_messagesToEnqueue.isEmpty()) + { + _preCommitActions.add(() -> { + AbstractJDBCMessageStore.this.enqueueMessages(_connWrapper, _messagesToEnqueue); + _messagesToEnqueue.clear(); }); } - AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber()); + List<TransactionLogResource> queues = _messagesToEnqueue.computeIfAbsent(message.getMessageNumber(), messageId -> new ArrayList<>()); + queues.add(queue); return new JDBCEnqueueRecord(queue.getId(), message.getMessageNumber()); } @@ -1236,6 +1240,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { checkMessageStoreOpen(); _preCommitActions.clear(); + _messagesToEnqueue.clear(); AbstractJDBCMessageStore.this.abortTran(_connWrapper); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org