Repository: activemq
Updated Branches:
  refs/heads/master 9e856290c -> 03a211ec0


AMQ-6317: Use an SQL Statement for each createSchemaStatement
closes #190


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/03a211ec
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/03a211ec
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/03a211ec

Branch: refs/heads/master
Commit: 03a211ec061d9ed49dc1ac16f171d8f4458483b8
Parents: 9e85629
Author: Jeroen Bastijns <[email protected]>
Authored: Thu Jun 9 11:09:18 2016 +0200
Committer: gtully <[email protected]>
Committed: Tue Jun 28 15:57:17 2016 +0100

----------------------------------------------------------------------
 activemq-jdbc-store/pom.xml                     |  26 +++
 .../store/jdbc/adapter/DefaultJDBCAdapter.java  | 109 +++++------
 .../DefaultJDBCAdapterDoCreateTablesTest.java   | 181 +++++++++++++++++++
 3 files changed, 265 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/03a211ec/activemq-jdbc-store/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/pom.xml b/activemq-jdbc-store/pom.xml
index 9e3ba0f..a22d657 100755
--- a/activemq-jdbc-store/pom.xml
+++ b/activemq-jdbc-store/pom.xml
@@ -49,6 +49,32 @@
       <artifactId>activeio-core</artifactId>
       <optional>true</optional>
     </dependency>
+
+    <!-- =============================== -->
+    <!-- Testing Dependencies -->
+    <!-- =============================== -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-broker</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <reporting>

http://git-wip-us.apache.org/repos/asf/activemq/blob/03a211ec/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
index facf969..57438bc 100755
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.store.jdbc.adapter;
 
+import static javax.xml.bind.DatatypeConverter.parseBase64Binary;
+import static javax.xml.bind.DatatypeConverter.printBase64Binary;
+
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -37,7 +40,6 @@ import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.store.jdbc.JDBCAdapter;
 import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
 import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
-import org.apache.activemq.store.jdbc.JDBCMessageStore;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
 import org.apache.activemq.store.jdbc.Statements;
@@ -46,9 +48,6 @@ import org.apache.activemq.util.DataByteArrayOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static javax.xml.bind.DatatypeConverter.parseBase64Binary;
-import static javax.xml.bind.DatatypeConverter.printBase64Binary;
-
 /**
  * Implements all the default JDBC operations that are used by the 
JDBCPersistenceAdapter. <p/> sub-classing is
  * encouraged to override the default implementation of methods to account for 
differences in JDBC Driver
@@ -65,6 +64,7 @@ import static 
javax.xml.bind.DatatypeConverter.printBase64Binary;
 public class DefaultJDBCAdapter implements JDBCAdapter {
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultJDBCAdapter.class);
     public static final int MAX_ROWS = 
org.apache.activemq.ActiveMQPrefetchPolicy.MAX_PREFETCH_SIZE;
+    private static final String FAILURE_MESSAGE = "Failure was: %s Message: %s 
SQLState: %s Vendor code: %s";
     protected Statements statements;
     private boolean batchStatements = true;
     //This is deprecated and should be removed in a future release
@@ -82,58 +82,68 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     }
 
     @Override
-    public void doCreateTables(TransactionContext c) throws SQLException, 
IOException {
-        Statement s = null;
+    public void doCreateTables(TransactionContext transactionContext) throws 
SQLException, IOException {
         cleanupExclusiveLock.writeLock().lock();
         try {
-            // Check to see if the table already exists. If it does, then don't
-            // log warnings during startup.
-            // Need to run the scripts anyways since they may contain ALTER
-            // statements that upgrade a previous version
-            // of the table
-            boolean alreadyExists = false;
-            ResultSet rs = null;
-            try {
-                rs = c.getConnection().getMetaData().getTables(null, null, 
this.statements.getFullMessageTableName(),
-                        new String[] { "TABLE" });
-                alreadyExists = rs.next();
-            } catch (Throwable ignore) {
-            } finally {
-                close(rs);
-            }
-            s = c.getConnection().createStatement();
-            String[] createStatments = 
this.statements.getCreateSchemaStatements();
-            for (int i = 0; i < createStatments.length; i++) {
+            // Check to see if the table already exists. If it does, then 
don't log warnings during startup.
+            // Need to run the scripts anyways since they may contain ALTER 
statements that upgrade a previous version of the table
+            boolean messageTableAlreadyExists = 
messageTableAlreadyExists(transactionContext);
+
+            for (String createStatement : 
this.statements.getCreateSchemaStatements()) {
                 // This will fail usually since the tables will be
                 // created already.
-                try {
-                    LOG.debug("Executing SQL: " + createStatments[i]);
-                    s.execute(createStatments[i]);
-                } catch (SQLException e) {
-                    if (alreadyExists) {
-                        LOG.debug("Could not create JDBC tables; The message 
table already existed." + " Failure was: "
-                                + createStatments[i] + " Message: " + 
e.getMessage() + " SQLState: " + e.getSQLState()
-                                + " Vendor code: " + e.getErrorCode());
-                    } else {
-                        LOG.warn("Could not create JDBC tables; they could 
already exist." + " Failure was: "
-                                + createStatments[i] + " Message: " + 
e.getMessage() + " SQLState: " + e.getSQLState()
-                                + " Vendor code: " + e.getErrorCode());
-                        JDBCPersistenceAdapter.log("Failure details: ", e);
-                    }
-                }
-            }
-
-            // if autoCommit used do not call commit
-            if(!c.getConnection().getAutoCommit()){
-                c.getConnection().commit();
+                executeStatement(transactionContext, createStatement, 
messageTableAlreadyExists);
             }
 
         } finally {
             cleanupExclusiveLock.writeLock().unlock();
-            try {
-                s.close();
-            } catch (Throwable e) {
+        }
+    }
+
+    private boolean messageTableAlreadyExists(TransactionContext 
transactionContext) {
+        boolean alreadyExists = false;
+        ResultSet rs = null;
+        try {
+            rs = 
transactionContext.getConnection().getMetaData().getTables(null, null, 
this.statements.getFullMessageTableName(), new String[] { "TABLE" });
+            alreadyExists = rs.next();
+        } catch (Throwable ignore) {
+        } finally {
+            close(rs);
+        }
+        return alreadyExists;
+    }
+
+    private void executeStatement(TransactionContext transactionContext, 
String createStatement, boolean ignoreStatementExecutionFailure) throws 
IOException {
+        Statement statement = null;
+        try {
+            LOG.debug("Executing SQL: " + createStatement);
+            statement = transactionContext.getConnection().createStatement();
+            statement.execute(createStatement);
+
+            commitIfAutoCommitIsDisabled(transactionContext);
+        } catch (SQLException e) {
+            if (ignoreStatementExecutionFailure) {
+                LOG.debug("Could not create JDBC tables; The message table 
already existed. " + String.format(FAILURE_MESSAGE, createStatement, 
e.getMessage(), e.getSQLState(), e.getErrorCode()));
+            } else {
+                LOG.warn("Could not create JDBC tables; they could already 
exist. " + String.format(FAILURE_MESSAGE, createStatement, e.getMessage(), 
e.getSQLState(), e.getErrorCode()));
+                JDBCPersistenceAdapter.log("Failure details: ", e);
+            }
+        } finally {
+            closeStatement(statement);
+        }
+    }
+
+    private void closeStatement(Statement statement) {
+        try {
+            if (statement != null) {
+                statement.close();
             }
+        } catch (SQLException ignored) {}
+    }
+
+    private void commitIfAutoCommitIsDisabled(TransactionContext c) throws 
SQLException, IOException {
+        if (!c.getConnection().getAutoCommit()) {
+            c.getConnection().commit();
         }
     }
 
@@ -157,10 +167,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                     JDBCPersistenceAdapter.log("Failure details: ", e);
                 }
             }
-            // if autoCommit used do not call commit
-            if(!c.getConnection().getAutoCommit()){
-               c.getConnection().commit();
-            }
+            commitIfAutoCommitIsDisabled(c);
         } finally {
             cleanupExclusiveLock.writeLock().unlock();
             try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/03a211ec/activemq-jdbc-store/src/test/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapterDoCreateTablesTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-jdbc-store/src/test/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapterDoCreateTablesTest.java
 
b/activemq-jdbc-store/src/test/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapterDoCreateTablesTest.java
new file mode 100644
index 0000000..39dc0e9
--- /dev/null
+++ 
b/activemq-jdbc-store/src/test/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapterDoCreateTablesTest.java
@@ -0,0 +1,181 @@
+package org.apache.activemq.store.jdbc.adapter;
+
+import static org.apache.log4j.Level.DEBUG;
+import static org.apache.log4j.Level.WARN;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.activemq.store.jdbc.Statements;
+import org.apache.activemq.store.jdbc.TransactionContext;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultJDBCAdapterDoCreateTablesTest {
+
+       private static final String CREATE_STATEMENT1 = "createStatement1";
+       private static final String CREATE_STATEMENT2 = "createStatement2";
+       private static final String[] CREATE_STATEMENTS = new String[] { 
CREATE_STATEMENT1, CREATE_STATEMENT2 };
+       private static final int VENDOR_CODE = 1;
+       private static final String SQL_STATE = "SqlState";
+       private static final String MY_REASON = "MyReason";
+
+       private DefaultJDBCAdapter defaultJDBCAdapter;
+
+       private List<LoggingEvent> loggingEvents = new ArrayList<>();
+
+       @Mock
+       private ReadWriteLock readWriteLock;
+
+       @Mock
+       private Lock lock;
+
+       @Mock
+       private TransactionContext transactionContext;
+
+       @Mock(answer = RETURNS_DEEP_STUBS)
+       private Connection connection;
+
+       @Mock
+       private Statements statements;
+
+       @Mock
+       private ResultSet resultSet;
+
+       @Mock
+       private Statement statement1, statement2;
+
+       @Before
+       public void setUp() throws IOException, SQLException {
+               DefaultTestAppender appender = new DefaultTestAppender() {
+                       @Override
+                       public void doAppend(LoggingEvent event) {
+                               loggingEvents.add(event);
+                       }
+               };
+               Logger rootLogger = Logger.getRootLogger();
+               rootLogger.setLevel(Level.DEBUG);
+               rootLogger.addAppender(appender);
+
+
+               defaultJDBCAdapter = new DefaultJDBCAdapter();
+               defaultJDBCAdapter.cleanupExclusiveLock = readWriteLock;
+               defaultJDBCAdapter.statements = statements;
+
+               
when(statements.getCreateSchemaStatements()).thenReturn(CREATE_STATEMENTS);
+               when(transactionContext.getConnection()).thenReturn(connection);
+               when(connection.getMetaData().getTables(null, null, 
this.statements.getFullMessageTableName(),new String[] { "TABLE" 
})).thenReturn(resultSet);
+               when(connection.createStatement()).thenReturn(statement1, 
statement2);
+               when(connection.getAutoCommit()).thenReturn(true);
+               when(readWriteLock.writeLock()).thenReturn(lock);
+       }
+
+       @After
+       public void tearDown() {
+               loggingEvents = new ArrayList<>();
+       }
+
+       @Test
+       public void 
createsTheTablesWhenNoMessageTableExistsAndLogsSqlExceptionsInWarnLevel() 
throws IOException, SQLException {
+               when(resultSet.next()).thenReturn(false);
+               when(statement2.execute(CREATE_STATEMENT2)).thenThrow(new 
SQLException(MY_REASON, SQL_STATE, VENDOR_CODE));
+
+               defaultJDBCAdapter.doCreateTables(transactionContext);
+
+               InOrder inOrder = inOrder(lock, resultSet, connection, 
statement1, statement2);
+               inOrder.verify(lock).lock();
+               inOrder.verify(resultSet).next();
+               inOrder.verify(resultSet).close();
+               inOrder.verify(connection).createStatement();
+               inOrder.verify(statement1).execute(CREATE_STATEMENT1);
+               inOrder.verify(statement1).close();
+               inOrder.verify(connection).createStatement();
+               inOrder.verify(statement2).execute(CREATE_STATEMENT2);
+               inOrder.verify(statement2).close();
+               inOrder.verify(lock).unlock();
+
+               assertEquals(4, loggingEvents.size());
+               assertLog(0, DEBUG, "Executing SQL: " + CREATE_STATEMENT1);
+               assertLog(1, DEBUG, "Executing SQL: " + CREATE_STATEMENT2);
+               assertLog(2, WARN, "Could not create JDBC tables; they could 
already exist. Failure was: " + CREATE_STATEMENT2 + " Message: " + MY_REASON + 
" SQLState: " + SQL_STATE + " Vendor code: " + VENDOR_CODE);
+               assertLog(3, WARN, "Failure details: " + MY_REASON);
+       }
+
+       @Test
+       public void 
triesTocreateTheTablesWhenMessageTableExistsAndLogsSqlExceptionsInDebugLevel() 
throws SQLException, IOException {
+               when(resultSet.next()).thenReturn(true);
+               when(statement1.execute(CREATE_STATEMENT1)).thenThrow(new 
SQLException(MY_REASON, SQL_STATE, VENDOR_CODE));
+
+               defaultJDBCAdapter.doCreateTables(transactionContext);
+
+               InOrder inOrder = inOrder(lock, resultSet, connection, 
statement1, statement2);
+               inOrder.verify(lock).lock();
+               inOrder.verify(resultSet).next();
+               inOrder.verify(resultSet).close();
+               inOrder.verify(connection).createStatement();
+               inOrder.verify(statement1).execute(CREATE_STATEMENT1);
+               inOrder.verify(statement1).close();
+               inOrder.verify(connection).createStatement();
+               inOrder.verify(statement2).execute(CREATE_STATEMENT2);
+               inOrder.verify(statement2).close();
+               inOrder.verify(lock).unlock();
+
+               assertEquals(3, loggingEvents.size());
+               assertLog(0, DEBUG, "Executing SQL: " + CREATE_STATEMENT1);
+               assertLog(1, DEBUG, "Could not create JDBC tables; The message 
table already existed. Failure was: " + CREATE_STATEMENT1 + " Message: " + 
MY_REASON + " SQLState: " + SQL_STATE + " Vendor code: " + VENDOR_CODE);
+               assertLog(2, DEBUG, "Executing SQL: " + CREATE_STATEMENT2);
+       }
+
+       @Test
+       public void commitsTheTransactionWhenAutoCommitIsDisabled() throws 
SQLException, IOException {
+               when(connection.getAutoCommit()).thenReturn(false);
+               when(resultSet.next()).thenReturn(false);
+
+               defaultJDBCAdapter.doCreateTables(transactionContext);
+
+               InOrder inOrder = inOrder(lock, resultSet, connection, 
statement1, statement2);
+               inOrder.verify(lock).lock();
+               inOrder.verify(resultSet).next();
+               inOrder.verify(resultSet).close();
+               inOrder.verify(connection).createStatement();
+               inOrder.verify(statement1).execute(CREATE_STATEMENT1);
+               inOrder.verify(connection).commit();
+               inOrder.verify(statement1).close();
+               inOrder.verify(connection).createStatement();
+               inOrder.verify(statement2).execute(CREATE_STATEMENT2);
+               inOrder.verify(connection).commit();
+               inOrder.verify(statement2).close();
+               inOrder.verify(lock).unlock();
+
+               assertEquals(2, loggingEvents.size());
+               assertLog(0, DEBUG, "Executing SQL: " + CREATE_STATEMENT1);
+               assertLog(1, DEBUG, "Executing SQL: " + CREATE_STATEMENT2);
+       }
+
+       private void assertLog(int messageNumber, Level level, String message) {
+               LoggingEvent loggingEvent = loggingEvents.get(messageNumber);
+               assertEquals(level, loggingEvent.getLevel());
+               assertEquals(message, loggingEvent.getMessage());
+       }
+}
\ No newline at end of file

Reply via email to