OOZIE-2854 Oozie should handle transient database problems (andras.piros via 
gezapeti)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/a54f7c20
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/a54f7c20
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/a54f7c20

Branch: refs/heads/master
Commit: a54f7c20d7c5d1d72f195d6d92001edd147bc304
Parents: a436d98
Author: Gezapeti Cseh <gezap...@gmail.com>
Authored: Wed Jul 12 09:16:10 2017 +0200
Committer: Gezapeti Cseh <gezap...@gmail.com>
Committed: Wed Jul 12 09:16:25 2017 +0200

----------------------------------------------------------------------
 core/pom.xml                                    |   6 +
 .../oozie/command/SkipCommitFaultInjection.java |  41 ++
 .../executor/jpa/JsonBeanPersisterExecutor.java |  42 +++
 .../oozie/executor/jpa/QueryExecutor.java       |  25 +-
 .../org/apache/oozie/service/JPAService.java    | 371 +++++++++++++------
 .../org/apache/oozie/sla/SLASummaryBean.java    |   5 +-
 .../org/apache/oozie/store/WorkflowStore.java   |   1 -
 .../oozie/util/db/BasicDataSourceWrapper.java   | 123 ++++++
 .../oozie/util/db/DatabaseRetryPredicate.java   |  51 +++
 .../oozie/util/db/FailingConnectionWrapper.java | 371 +++++++++++++++++++
 .../util/db/FailingHSQLDBDriverWrapper.java     |  37 ++
 .../util/db/FailingMySQLDriverWrapper.java      |  51 +++
 .../oozie/util/db/OperationRetryHandler.java    | 130 +++++++
 ...ceExceptionSubclassFilterRetryPredicate.java |  73 ++++
 .../apache/oozie/util/db/RetryAttemptState.java |  84 +++++
 .../oozie/util/db/RuntimeExceptionInjector.java |  77 ++++
 .../src/main/resources/META-INF/persistence.xml |  12 +-
 core/src/main/resources/oozie-default.xml       |  33 +-
 core/src/main/resources/oozie-log4j.properties  |   1 +
 .../oozie/command/SkipCommitFaultInjection.java |  41 --
 .../java/org/apache/oozie/test/XTestCase.java   | 106 +++---
 .../util/db/TestOozieDmlStatementPredicate.java |  63 ++++
 .../util/db/TestOperationRetryHandler.java      | 218 +++++++++++
 ...ceExceptionSubclassFilterRetryPredicate.java |  98 +++++
 .../oozie/util/db/TestRetryAttemptState.java    | 132 +++++++
 docs/src/site/twiki/AG_Install.twiki            |  31 ++
 minitest/pom.xml                                |   4 +-
 .../test/TestParallelJPAOperationRetries.java   | 144 +++++++
 .../org/apache/oozie/test/TestWorkflow.java     | 215 +++++++++++
 .../apache/oozie/test/TestWorkflowRetries.java  |  34 ++
 .../org/apache/oozie/test/WorkflowTest.java     | 188 ----------
 minitest/src/test/resources/fs-decision.xml     |  51 +++
 .../src/test/resources/hsqldb-oozie-site.xml    |   6 +-
 .../src/test/resources/oozie-log4j.properties   |   1 +
 .../test/resources/parallel-fs-and-shell.xml    |  73 ++++
 minitest/src/test/resources/wf-test.xml         |  51 ---
 pom.xml                                         |   2 +-
 release-log.txt                                 |   1 +
 38 files changed, 2492 insertions(+), 501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index acddf34..7275775 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -280,6 +280,12 @@
         </dependency>
 
         <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
             <groupId>commons-logging</groupId>
             <artifactId>commons-logging</artifactId>
             <scope>compile</scope>

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/command/SkipCommitFaultInjection.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/SkipCommitFaultInjection.java 
b/core/src/main/java/org/apache/oozie/command/SkipCommitFaultInjection.java
new file mode 100644
index 0000000..158fbfa
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/SkipCommitFaultInjection.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command;
+
+import org.apache.oozie.FaultInjection;
+
+public class SkipCommitFaultInjection extends FaultInjection {
+
+    public static final String ACTION_FAILOVER_FAULT_INJECTION = 
"oozie.fault.injection.action.failover";
+
+    private static boolean ACTIVE = false;
+
+    public boolean activate() {
+        ACTIVE = 
Boolean.parseBoolean(System.getProperty(ACTION_FAILOVER_FAULT_INJECTION, 
"false"));
+        return ACTIVE;
+    }
+
+    public void deactivate() {
+        ACTIVE = false;
+    }
+
+    public boolean isActive() {
+        return ACTIVE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/executor/jpa/JsonBeanPersisterExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/JsonBeanPersisterExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/JsonBeanPersisterExecutor.java
new file mode 100644
index 0000000..217e442
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/JsonBeanPersisterExecutor.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.executor.jpa;
+
+import javax.persistence.EntityManager;
+
+import org.apache.oozie.client.rest.JsonBean;
+
+public class JsonBeanPersisterExecutor implements JPAExecutor<Void> {
+    private final JsonBean bean;
+
+    public JsonBeanPersisterExecutor(JsonBean bean) {
+        this.bean = bean;
+    }
+
+    @Override
+    public String getName() {
+        return "JsonBeanPersisterExecutor";
+    }
+
+    @Override
+    public Void execute(EntityManager em) throws JPAExecutorException {
+        em.persist(bean);
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java 
b/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java
index 8d94c23..dfafea0 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/QueryExecutor.java
@@ -21,47 +21,26 @@ package org.apache.oozie.executor.jpa;
 import java.util.List;
 
 import javax.persistence.EntityManager;
-import javax.persistence.PersistenceException;
 import javax.persistence.Query;
 
-import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.util.XLog;
 
 /**
  * Base Class of Query Executor
  */
 public abstract class QueryExecutor<T, E extends Enum<E>> {
-    private static XLog LOG;
 
     protected QueryExecutor() {
     }
 
     public abstract int executeUpdate(E namedQuery, T jobBean) throws 
JPAExecutorException;
 
-    public void insert(JsonBean bean) throws JPAExecutorException {
+    public void insert(final JsonBean bean) throws JPAExecutorException {
         if (bean != null) {
             JPAService jpaService = Services.get().get(JPAService.class);
-            EntityManager em = jpaService.getEntityManager();
-            try {
-                em.getTransaction().begin();
-                em.persist(bean);
-                em.getTransaction().commit();
-            }
-            catch (PersistenceException e) {
-                throw new JPAExecutorException(ErrorCode.E0603, e);
-            }
-            finally {
-                if (em.getTransaction().isActive()) {
-                    LOG.warn("insert ended with an active transaction, rolling 
back");
-                    em.getTransaction().rollback();
-                }
-                if (em.isOpen()) {
-                    em.close();
-                }
-            }
+            jpaService.execute(new JsonBeanPersisterExecutor(bean));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/service/JPAService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/JPAService.java 
b/core/src/main/java/org/apache/oozie/service/JPAService.java
index 028381d..93fe9da 100644
--- a/core/src/main/java/org/apache/oozie/service/JPAService.java
+++ b/core/src/main/java/org/apache/oozie/service/JPAService.java
@@ -23,14 +23,16 @@ import java.text.MessageFormat;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.Callable;
 
 import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
+import javax.persistence.EntityTransaction;
 import javax.persistence.NoResultException;
 import javax.persistence.Persistence;
-import javax.persistence.PersistenceException;
 import javax.persistence.Query;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.dbcp.BasicDataSource;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -45,6 +47,7 @@ import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.client.rest.JsonSLAEvent;
+import org.apache.oozie.command.SkipCommitFaultInjection;
 import org.apache.oozie.compression.CodecFactory;
 import org.apache.oozie.executor.jpa.JPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
@@ -54,7 +57,10 @@ import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.db.OperationRetryHandler;
+import 
org.apache.oozie.util.db.PersistenceExceptionSubclassFilterRetryPredicate;
 import org.apache.openjpa.lib.jdbc.DecoratingDataSource;
+import org.apache.openjpa.persistence.InvalidStateException;
 import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
 
 /**
@@ -64,6 +70,10 @@ import 
org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
 public class JPAService implements Service, Instrumentable {
     private static final String INSTRUMENTATION_GROUP_JPA = "jpa";
 
+    public static final long DEFAULT_INITIAL_WAIT_TIME = 100;
+    public static final long DEFAULT_MAX_WAIT_TIME = 30_000;
+    public static final int DEFAULT_MAX_RETRY_COUNT = 1;
+
     public static final String CONF_DB_SCHEMA = "oozie.db.schema.name";
 
     public static final String CONF_PREFIX = Service.CONF_PREFIX + 
"JPAService.";
@@ -79,13 +89,16 @@ public class JPAService implements Service, Instrumentable {
     public static final String CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL = 
CONF_PREFIX + "validate.db.connection.eviction.interval";
     public static final String CONF_VALIDATE_DB_CONN_EVICTION_NUM = 
CONF_PREFIX + "validate.db.connection.eviction.num";
     public static final String CONF_OPENJPA_BROKER_IMPL = CONF_PREFIX + 
"openjpa.BrokerImpl";
-
-
+    public static final String INITIAL_WAIT_TIME = CONF_PREFIX + 
"retry.initial-wait-time.ms";
+    public static final String MAX_WAIT_TIME = CONF_PREFIX + 
"maximum-wait-time.ms";
+    public static final String MAX_RETRY_COUNT = CONF_PREFIX + 
"retry.max-retries";
+    public static final String SKIP_COMMIT_FAULT_INJECTION_CLASS = 
SkipCommitFaultInjection.class.getName();
 
     private EntityManagerFactory factory;
     private Instrumentation instr;
 
     private static XLog LOG;
+    private OperationRetryHandler retryHandler;
 
     /**
      * Return the public interface of the service.
@@ -97,7 +110,7 @@ public class JPAService implements Service, Instrumentable {
     }
 
     @Override
-    public void instrument(Instrumentation instr) {
+    public void instrument(final Instrumentation instr) {
         this.instr = instr;
 
         final BasicDataSource dataSource = getBasicDataSource();
@@ -121,10 +134,10 @@ public class JPAService implements Service, 
Instrumentable {
         // Get the BasicDataSource object; it could be wrapped in a 
DecoratingDataSource
         // It might also not be a BasicDataSource if the user configured 
something different
         BasicDataSource basicDataSource = null;
-        OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) 
factory;
-        Object connectionFactory = 
spi.getConfiguration().getConnectionFactory();
+        final OpenJPAEntityManagerFactorySPI spi = 
(OpenJPAEntityManagerFactorySPI) factory;
+        final Object connectionFactory = 
spi.getConfiguration().getConnectionFactory();
         if (connectionFactory instanceof DecoratingDataSource) {
-            DecoratingDataSource decoratingDataSource = (DecoratingDataSource) 
connectionFactory;
+            final DecoratingDataSource decoratingDataSource = 
(DecoratingDataSource) connectionFactory;
             basicDataSource = (BasicDataSource) 
decoratingDataSource.getInnermostDelegate();
         } else if (connectionFactory instanceof BasicDataSource) {
             basicDataSource = (BasicDataSource) connectionFactory;
@@ -137,22 +150,22 @@ public class JPAService implements Service, 
Instrumentable {
      *
      * @param services services instance.
      */
-    public void init(Services services) throws ServiceException {
+    public void init(final Services services) throws ServiceException {
         LOG = XLog.getLog(JPAService.class);
-        Configuration conf = services.getConf();
-        String dbSchema = ConfigurationService.get(conf, CONF_DB_SCHEMA);
+        final Configuration conf = services.getConf();
+        final String dbSchema = ConfigurationService.get(conf, CONF_DB_SCHEMA);
         String url = ConfigurationService.get(conf, CONF_URL);
-        String driver = ConfigurationService.get(conf, CONF_DRIVER);
-        String user = ConfigurationService.get(conf, CONF_USERNAME);
-        String password = ConfigurationService.getPassword(conf, 
CONF_PASSWORD).trim();
-        String maxConn = ConfigurationService.get(conf, 
CONF_MAX_ACTIVE_CONN).trim();
-        String dataSource = ConfigurationService.get(conf, 
CONF_CONN_DATA_SOURCE);
-        String connPropsConfig = ConfigurationService.get(conf, 
CONF_CONN_PROPERTIES);
-        String brokerImplConfig = ConfigurationService.get(conf, 
CONF_OPENJPA_BROKER_IMPL);
-        boolean autoSchemaCreation = ConfigurationService.getBoolean(conf, 
CONF_CREATE_DB_SCHEMA);
-        boolean validateDbConn = ConfigurationService.getBoolean(conf, 
CONF_VALIDATE_DB_CONN);
-        String evictionInterval = ConfigurationService.get(conf, 
CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
-        String evictionNum = ConfigurationService.get(conf, 
CONF_VALIDATE_DB_CONN_EVICTION_NUM).trim();
+        final String driver = ConfigurationService.get(conf, CONF_DRIVER);
+        final String user = ConfigurationService.get(conf, CONF_USERNAME);
+        final String password = ConfigurationService.getPassword(conf, 
CONF_PASSWORD).trim();
+        final String maxConn = ConfigurationService.get(conf, 
CONF_MAX_ACTIVE_CONN).trim();
+        final String dataSource = ConfigurationService.get(conf, 
CONF_CONN_DATA_SOURCE);
+        final String connPropsConfig = ConfigurationService.get(conf, 
CONF_CONN_PROPERTIES);
+        final String brokerImplConfig = ConfigurationService.get(conf, 
CONF_OPENJPA_BROKER_IMPL);
+        final boolean autoSchemaCreation = 
ConfigurationService.getBoolean(conf, CONF_CREATE_DB_SCHEMA);
+        final boolean validateDbConn = ConfigurationService.getBoolean(conf, 
CONF_VALIDATE_DB_CONN);
+        final String evictionInterval = ConfigurationService.get(conf, 
CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
+        final String evictionNum = ConfigurationService.get(conf, 
CONF_VALIDATE_DB_CONN_EVICTION_NUM).trim();
 
         if (!url.startsWith("jdbc:")) {
             throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC 
URL, must start with 'jdbc:'");
@@ -163,14 +176,14 @@ public class JPAService implements Service, 
Instrumentable {
         }
         dbType = dbType.substring(0, dbType.indexOf(":"));
 
-        String persistentUnit = "oozie-" + dbType;
+        final String persistentUnit = "oozie-" + dbType;
 
         // Checking existince of ORM file for DB type
-        String ormFile = "META-INF/" + persistentUnit + "-orm.xml";
+        final String ormFile = "META-INF/" + persistentUnit + "-orm.xml";
         try {
             IOUtils.getResourceAsStream(ormFile, -1);
         }
-        catch (IOException ex) {
+        catch (final IOException ex) {
             throw new ServiceException(ErrorCode.E0609, dbType, ormFile);
         }
 
@@ -182,7 +195,7 @@ public class JPAService implements Service, Instrumentable {
 
         String connProps = 
"DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
         connProps = MessageFormat.format(connProps, driver, url, user, 
password, maxConn);
-        Properties props = new Properties();
+        final Properties props = new Properties();
         if (autoSchemaCreation) {
             connProps += 
",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
             props.setProperty("openjpa.jdbc.SynchronizeMappings", 
"buildSchema(ForeignKeys=true)");
@@ -190,8 +203,8 @@ public class JPAService implements Service, Instrumentable {
         else if (validateDbConn) {
             // validation can be done only if the schema already exist, else a
             // connection cannot be obtained to create the schema.
-            String interval = "timeBetweenEvictionRunsMillis=" + 
evictionInterval;
-            String num = "numTestsPerEvictionRun=" + evictionNum;
+            final String interval = "timeBetweenEvictionRunsMillis=" + 
evictionInterval;
+            final String num = "numTestsPerEvictionRun=" + evictionNum;
             connProps += 
",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + 
num;
             connProps += ",ValidationQuery=select count(*) from VALIDATE_CONN";
             connProps = MessageFormat.format(connProps, dbSchema);
@@ -210,36 +223,76 @@ public class JPAService implements Service, 
Instrumentable {
             LOG.info("Setting openjpa.BrokerImpl to {0}", brokerImplConfig);
         }
 
+        initRetryHandler();
+
         factory = Persistence.createEntityManagerFactory(persistentUnit, 
props);
 
-        EntityManager entityManager = getEntityManager();
-        entityManager.find(WorkflowActionBean.class, 1);
-        entityManager.find(WorkflowJobBean.class, 1);
-        entityManager.find(CoordinatorActionBean.class, 1);
-        entityManager.find(CoordinatorJobBean.class, 1);
-        entityManager.find(SLAEventBean.class, 1);
-        entityManager.find(JsonSLAEvent.class, 1);
-        entityManager.find(BundleJobBean.class, 1);
-        entityManager.find(BundleActionBean.class, 1);
-        entityManager.find(SLARegistrationBean.class, 1);
-        entityManager.find(SLASummaryBean.class, 1);
+        final EntityManager entityManager = getEntityManager();
+        findRetrying(entityManager, WorkflowActionBean.class, 1);
+        findRetrying(entityManager, WorkflowJobBean.class, 1);
+        findRetrying(entityManager, CoordinatorActionBean.class, 1);
+        findRetrying(entityManager, CoordinatorJobBean.class, 1);
+        findRetrying(entityManager, SLAEventBean.class, 1);
+        findRetrying(entityManager, JsonSLAEvent.class, 1);
+        findRetrying(entityManager, BundleActionBean.class, 1);
+        findRetrying(entityManager, BundleJobBean.class, 1);
+        findRetrying(entityManager, SLARegistrationBean.class, 1);
+        findRetrying(entityManager, SLASummaryBean.class, 1);
 
         LOG.info(XLog.STD, "All entities initialized");
         // need to use a pseudo no-op transaction so all entities, datasource
         // and connection pool are initialized one time only
         entityManager.getTransaction().begin();
-        OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) 
factory;
+        final OpenJPAEntityManagerFactorySPI spi = 
(OpenJPAEntityManagerFactorySPI) factory;
         // Mask the password with '***'
-        String logMsg = 
spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", 
"Password=***,");
+        final String logMsg = 
spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", 
"Password=***,");
         LOG.info("JPA configuration: {0}", logMsg);
         entityManager.getTransaction().commit();
         entityManager.close();
         try {
             CodecFactory.initialize(conf);
         }
-        catch (Exception ex) {
+        catch (final Exception ex) {
             throw new ServiceException(ErrorCode.E0100, getClass().getName(), 
ex);
         }
+
+    }
+
+    private void initRetryHandler() {
+        final long initialWaitTime = 
ConfigurationService.getInt(INITIAL_WAIT_TIME, (int) DEFAULT_INITIAL_WAIT_TIME);
+        final long maxWaitTime = ConfigurationService.getInt(MAX_WAIT_TIME, 
(int) DEFAULT_MAX_WAIT_TIME);
+        final int maxRetryCount = ConfigurationService.getInt(MAX_RETRY_COUNT, 
DEFAULT_MAX_RETRY_COUNT);
+
+        LOG.info(XLog.STD, "Failing database operations will be retried {0} 
times, with an initial sleep time of {1} ms,"
+                + "max sleep time {2} ms", maxRetryCount, initialWaitTime, 
maxWaitTime);
+        retryHandler = new OperationRetryHandler(maxRetryCount,
+                initialWaitTime,
+                maxWaitTime,
+                new PersistenceExceptionSubclassFilterRetryPredicate());
+    }
+
+    private void findRetrying(final EntityManager entityManager, final Class 
entityClass, final int primaryKey)
+            throws ServiceException {
+        try {
+            retryHandler.executeWithRetry(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    if (!entityManager.getTransaction().isActive()) {
+                        entityManager.getTransaction().begin();
+                    }
+
+                    entityManager.find(entityClass, primaryKey);
+
+                    if (entityManager.getTransaction().isActive()) {
+                        entityManager.getTransaction().commit();
+                    }
+                    return null;
+                }
+            });
+        }
+        catch (final Exception e) {
+            throw new ServiceException(ErrorCode.E0603, e);
+        }
     }
 
     /**
@@ -247,7 +300,12 @@ public class JPAService implements Service, Instrumentable 
{
      */
     public void destroy() {
         if (factory != null && factory.isOpen()) {
-            factory.close();
+            try {
+                factory.close();
+            }
+            catch (final InvalidStateException ise) {
+                LOG.warn("Cannot close EntityManagerFactory. 
[ise.message={0}]", ise.getMessage());
+            }
         }
     }
 
@@ -258,28 +316,33 @@ public class JPAService implements Service, 
Instrumentable {
      * @return return value of the JPAExecutor.
      * @throws JPAExecutorException thrown if an jpa executor failed
      */
-    public <T> T execute(JPAExecutor<T> executor) throws JPAExecutorException {
-        EntityManager em = getEntityManager();
-        Instrumentation.Cron cron = new Instrumentation.Cron();
+    public <T> T execute(final JPAExecutor<T> executor) throws 
JPAExecutorException {
+        final EntityManager em = getEntityManager();
+        final Instrumentation.Cron cron = new Instrumentation.Cron();
         try {
             LOG.trace("Executing JPAExecutor [{0}]", executor.getName());
             if (instr != null) {
                 instr.incr(INSTRUMENTATION_GROUP_JPA, executor.getName(), 1);
             }
             cron.start();
-            em.getTransaction().begin();
-            T t = executor.execute(em);
-            if (em.getTransaction().isActive()) {
-                if 
(FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
-                    throw new RuntimeException("Skipping Commit for Failover 
Testing");
-                }
 
-                em.getTransaction().commit();
-            }
-            return t;
+            return retryHandler.executeWithRetry(new Callable<T>() {
+                @Override
+                public T call() throws Exception {
+                    if (!em.getTransaction().isActive()) {
+                        em.getTransaction().begin();
+                    }
+
+                    final T t = executor.execute(em);
+
+                    checkAndCommit(em.getTransaction());
+
+                    return t;
+                }
+            });
         }
-        catch (PersistenceException e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e);
+        catch (final Exception e) {
+            throw getTargetException(e);
         }
         finally {
             cron.stop();
@@ -292,7 +355,7 @@ public class JPAService implements Service, Instrumentable {
                     em.getTransaction().rollback();
                 }
             }
-            catch (Exception ex) {
+            catch (final Exception ex) {
                 LOG.warn("Could not check/rollback transaction after 
JPAExecutor [{0}], {1}", executor.getName(), ex
                         .getMessage(), ex);
             }
@@ -304,13 +367,23 @@ public class JPAService implements Service, 
Instrumentable {
                     LOG.warn("JPAExecutor [{0}] closed the EntityManager, it 
should not!", executor.getName());
                 }
             }
-            catch (Exception ex) {
+            catch (final Exception ex) {
                 LOG.warn("Could not close EntityManager after JPAExecutor 
[{0}], {1}", executor.getName(), ex
                         .getMessage(), ex);
             }
         }
     }
 
+    private void checkAndCommit(final EntityTransaction tx) {
+        if (tx.isActive()) {
+            if (FaultInjection.isActive(SKIP_COMMIT_FAULT_INJECTION_CLASS)) {
+                throw new RuntimeException("Skipping Commit for Failover 
Testing");
+            }
+
+            tx.commit();
+        }
+    }
+
     /**
      * Execute an UPDATE query
      * @param namedQueryName the name of query to be executed
@@ -319,8 +392,8 @@ public class JPAService implements Service, Instrumentable {
      * @return Integer that query returns, which corresponds to the number of 
rows updated
      * @throws JPAExecutorException
      */
-    public int executeUpdate(String namedQueryName, Query query, EntityManager 
em) throws JPAExecutorException {
-        Instrumentation.Cron cron = new Instrumentation.Cron();
+    public int executeUpdate(final String namedQueryName, final Query query, 
final EntityManager em) throws JPAExecutorException {
+        final Instrumentation.Cron cron = new Instrumentation.Cron();
         try {
 
             LOG.trace("Executing Update/Delete Query [{0}]", namedQueryName);
@@ -328,18 +401,23 @@ public class JPAService implements Service, 
Instrumentable {
                 instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1);
             }
             cron.start();
-            em.getTransaction().begin();
-            int ret = query.executeUpdate();
-            if (em.getTransaction().isActive()) {
-                if 
(FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
-                    throw new RuntimeException("Skipping Commit for Failover 
Testing");
+
+            return retryHandler.executeWithRetry(new Callable<Integer>() {
+                @Override
+                public Integer call() throws Exception {
+                    if (!em.getTransaction().isActive()) {
+                        em.getTransaction().begin();
+                    }
+                    final int ret = query.executeUpdate();
+
+                    checkAndCommit(em.getTransaction());
+
+                    return ret;
                 }
-                em.getTransaction().commit();
-            }
-            return ret;
+            });
         }
-        catch (PersistenceException e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e);
+        catch (final Exception e) {
+            throw getTargetException(e);
         }
         finally {
             processFinally(em, cron, namedQueryName, true);
@@ -350,7 +428,7 @@ public class JPAService implements Service, Instrumentable {
         E namedQuery;
         Query query;
 
-        public QueryEntry(E namedQuery, Query query) {
+        public QueryEntry(final E namedQuery, final Query query) {
             this.namedQuery = namedQuery;
             this.query = query;
         }
@@ -364,7 +442,10 @@ public class JPAService implements Service, Instrumentable 
{
         }
     }
 
-    private void processFinally(EntityManager em, Instrumentation.Cron cron, 
String name, boolean checkActive) {
+    private void processFinally(final EntityManager em,
+                                final Instrumentation.Cron cron,
+                                final String name,
+                                final boolean checkActive) {
         cron.stop();
         if (instr != null) {
             instr.addCron(INSTRUMENTATION_GROUP_JPA, name, cron);
@@ -376,7 +457,7 @@ public class JPAService implements Service, Instrumentable {
                     em.getTransaction().rollback();
                 }
             }
-            catch (Exception ex) {
+            catch (final Exception ex) {
                 LOG.warn("Could not check/rollback transaction after [{0}], 
{1}", name,
                         ex.getMessage(), ex);
             }
@@ -389,7 +470,7 @@ public class JPAService implements Service, Instrumentable {
                 LOG.warn("[{0}] closed the EntityManager, it should not!", 
name);
             }
         }
-        catch (Exception ex) {
+        catch (final Exception ex) {
             LOG.warn("Could not close EntityManager after [{0}], {1}", name, 
ex.getMessage(), ex);
         }
     }
@@ -402,41 +483,57 @@ public class JPAService implements Service, 
Instrumentable {
      * @param em Entity Manager
      * @throws JPAExecutorException
      */
-    public void executeBatchInsertUpdateDelete(Collection<JsonBean> 
insertBeans, List<QueryEntry> updateQueryList,
-            Collection<JsonBean> deleteBeans, EntityManager em) throws 
JPAExecutorException {
-        Instrumentation.Cron cron = new Instrumentation.Cron();
+    public void executeBatchInsertUpdateDelete(final Collection<JsonBean> 
insertBeans, final List<QueryEntry> updateQueryList,
+            final Collection<JsonBean> deleteBeans, final EntityManager em) 
throws JPAExecutorException {
+        final Instrumentation.Cron cron = new Instrumentation.Cron();
         try {
 
             LOG.trace("Executing Queries in Batch");
             cron.start();
-            em.getTransaction().begin();
-            if (updateQueryList != null && updateQueryList.size() > 0) {
-                for (QueryEntry q : updateQueryList) {
-                    if (instr != null) {
-                        instr.incr(INSTRUMENTATION_GROUP_JPA, 
q.getQueryName().name(), 1);
+
+            retryHandler.executeWithRetry(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    if (em.getTransaction().isActive()) {
+                        try {
+                            em.getTransaction().rollback();
+                        }
+                        catch (final Exception e) {
+                            LOG.warn("Rollback failed - ignoring");
+                        }
                     }
-                    q.getQuery().executeUpdate();
-                }
-            }
-            if (insertBeans != null && insertBeans.size() > 0) {
-                for (JsonBean bean : insertBeans) {
-                    em.persist(bean);
-                }
-            }
-            if (deleteBeans != null && deleteBeans.size() > 0) {
-                for (JsonBean bean : deleteBeans) {
-                    em.remove(em.merge(bean));
-                }
-            }
-            if (em.getTransaction().isActive()) {
-                if 
(FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
-                    throw new RuntimeException("Skipping Commit for Failover 
Testing");
+
+                    em.getTransaction().begin();
+
+                    if (CollectionUtils.isNotEmpty(updateQueryList)) {
+                        for (final QueryEntry q : updateQueryList) {
+                            if (instr != null) {
+                                instr.incr(INSTRUMENTATION_GROUP_JPA, 
q.getQueryName().name(), 1);
+                            }
+                            q.getQuery().executeUpdate();
+                        }
+                    }
+
+                    if (CollectionUtils.isNotEmpty(insertBeans)) {
+                        for (final JsonBean bean : insertBeans) {
+                            em.persist(bean);
+                        }
+                    }
+
+                    if (CollectionUtils.isNotEmpty(deleteBeans)) {
+                        for (final JsonBean bean : deleteBeans) {
+                            em.remove(em.merge(bean));
+                        }
+                    }
+
+                    checkAndCommit(em.getTransaction());
+
+                    return null;
                 }
-                em.getTransaction().commit();
-            }
+            });
         }
-        catch (PersistenceException e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e);
+        catch (final Exception e) {
+            throw getTargetException(e);
         }
         finally {
             processFinally(em, cron, "batchqueryexecutor", true);
@@ -450,24 +547,33 @@ public class JPAService implements Service, 
Instrumentable {
      * @param em Entity Manager
      * @return object that matches the query
      */
-    public Object executeGet(String namedQueryName, Query query, EntityManager 
em) {
-        Instrumentation.Cron cron = new Instrumentation.Cron();
+    public Object executeGet(final String namedQueryName, final Query query, 
final EntityManager em) throws JPAExecutorException {
+        final Instrumentation.Cron cron = new Instrumentation.Cron();
         try {
-
             LOG.trace("Executing Select Query to Get a Single row  [{0}]", 
namedQueryName);
             if (instr != null) {
                 instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1);
             }
 
             cron.start();
-            Object obj = null;
-            try {
-                obj = query.getSingleResult();
-            }
-            catch (NoResultException e) {
-                // return null when no matched result
-            }
-            return obj;
+
+            return retryHandler.executeWithRetry(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    Object obj = null;
+                    try {
+                        obj = query.getSingleResult();
+                    }
+                    catch (final NoResultException e) {
+                        LOG.info("No results found");
+                        // return null when no matched result
+                    }
+                    return obj;
+                }
+            });
+        }
+        catch (final Exception e) {
+            throw getTargetException(e);
         }
         finally {
             processFinally(em, cron, namedQueryName, false);
@@ -481,8 +587,9 @@ public class JPAService implements Service, Instrumentable {
      * @param em Entity Manager
      * @return list containing results that match the query
      */
-    public List<?> executeGetList(String namedQueryName, Query query, 
EntityManager em) {
-        Instrumentation.Cron cron = new Instrumentation.Cron();
+    public List<?> executeGetList(final String namedQueryName, final Query 
query, final EntityManager em)
+            throws JPAExecutorException {
+        final Instrumentation.Cron cron = new Instrumentation.Cron();
         try {
 
             LOG.trace("Executing Select Query to Get Multiple Rows [{0}]", 
namedQueryName);
@@ -491,14 +598,24 @@ public class JPAService implements Service, 
Instrumentable {
             }
 
             cron.start();
-            List<?> resultList = null;
-            try {
-                resultList = query.getResultList();
-            }
-            catch (NoResultException e) {
-                // return null when no matched result
-            }
-            return resultList;
+
+            return retryHandler.executeWithRetry(new Callable<List<?>>() {
+                @Override
+                public List<?> call() throws Exception {
+                    List<?> resultList = null;
+                    try {
+                        resultList = query.getResultList();
+                    }
+                    catch (final NoResultException e) {
+                        LOG.info("No results found");
+                        // return null when no matched result
+                    }
+                    return resultList;
+                }
+            });
+        }
+        catch (final Exception e) {
+            throw getTargetException(e);
         }
         finally {
             processFinally(em, cron, namedQueryName, false);
@@ -514,4 +631,12 @@ public class JPAService implements Service, Instrumentable 
{
         return factory.createEntityManager();
     }
 
+    private JPAExecutorException getTargetException(final Exception e) {
+        if (e instanceof JPAExecutorException) {
+            return (JPAExecutorException) e;
+        }
+        else {
+            return new JPAExecutorException(ErrorCode.E0603, e.getMessage());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java 
b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
index cfe1522..e9ea9ba 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
@@ -66,7 +66,10 @@ import org.json.simple.JSONObject;
 
  @NamedQuery(name = "GET_SLA_SUMMARY_EVENTPROCESSED", query = "select 
w.eventProcessed from SLASummaryBean w where w.jobId = :id"),
 
- @NamedQuery(name = "GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED", query = 
"select w.eventProcessed, w.lastModifiedTS from SLASummaryBean w where w.jobId 
= :id")
+ @NamedQuery(name = "GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED",
+         query = "select w.eventProcessed, w.lastModifiedTS from 
SLASummaryBean w where w.jobId = :id"),
+
+ @NamedQuery(name = "GET_SLA_SUMMARY_ALL", query = "select OBJECT(w) from 
SLASummaryBean w")
 
 })
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/store/WorkflowStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/store/WorkflowStore.java 
b/core/src/main/java/org/apache/oozie/store/WorkflowStore.java
index c565e74..821abc5 100644
--- a/core/src/main/java/org/apache/oozie/store/WorkflowStore.java
+++ b/core/src/main/java/org/apache/oozie/store/WorkflowStore.java
@@ -22,7 +22,6 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/BasicDataSourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/util/db/BasicDataSourceWrapper.java 
b/core/src/main/java/org/apache/oozie/util/db/BasicDataSourceWrapper.java
new file mode 100644
index 0000000..c972ea2
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/db/BasicDataSourceWrapper.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util.db;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.dbcp.ConnectionFactory;
+import org.apache.commons.dbcp.DriverConnectionFactory;
+import org.apache.commons.dbcp.SQLNestedException;
+
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class BasicDataSourceWrapper extends BasicDataSource {
+
+    /**
+     * Fixing a bug within {@link BasicDataSource#createConnectionFactory()} 
for {@code driverClassName} to have real effect.
+     * <p/>
+     * Because we use currently Apache Commons DBCP 1.4.0 that has a bug not 
considering {@code driverClassName}, thus, we're unable
+     * to create a JDBC driver using a user-provided driver class name (we try 
to do that by setting explicitly a value for
+     * {@code openJpa.connectionProperties="DriverClassName=..."}), unless we 
perform the exact same fix that is applied by the DBCP
+     * patch.
+     * <p/>
+     * Note: when DBCP 1.4.1 will be released, and Oozie will update to 1.4.1, 
we can remove this class.
+     * <p/>
+     * Please see {@link https://issues.apache.org/jira/browse/DBCP-333 the 
DBCP bug} and
+     * {@link 
https://github.com/apache/commons-dbcp/blob/DBCP_1_4_x_BRANCH/src/java/org/apache/commons/dbcp/BasicDataSource.java
+     * #L1588-L1660 the fixed method}
+     * for details.
+     * <p/>
+     * Please also see how OpenJPA
+     * {@linkplain 
http://openjpa.apache.org/builds/2.2.1/apache-openjpa/docs/ref_guide_integration_dbcp.html
 is integrated}
+     * with DBCP.
+     */
+    protected ConnectionFactory createConnectionFactory() throws SQLException {
+        // Load the JDBC driver class
+        Class driverFromCCL = null;
+        if (driverClassName != null) {
+            try {
+                try {
+                    if (driverClassLoader == null) {
+                        driverFromCCL = Class.forName(driverClassName);
+                    } else {
+                        driverFromCCL = Class.forName(driverClassName, true, 
driverClassLoader);
+                    }
+                } catch (ClassNotFoundException cnfe) {
+                    driverFromCCL = Thread.currentThread(
+                    ).getContextClassLoader().loadClass(
+                            driverClassName);
+                }
+            } catch (Throwable t) {
+                String message = "Cannot load JDBC driver class '" +
+                        driverClassName + "'";
+                logWriter.println(message);
+                t.printStackTrace(logWriter);
+                throw new SQLNestedException(message, t);
+            }
+        }
+
+        // Create a JDBC driver instance
+        Driver driver = null;
+        try {
+            if (driverFromCCL == null) {
+                driver = DriverManager.getDriver(url);
+            } else {
+                // Usage of DriverManager is not possible, as it does not
+                // respect the ContextClassLoader
+                driver = (Driver) driverFromCCL.newInstance();
+                if (!driver.acceptsURL(url)) {
+                    throw new SQLException("No suitable driver", "08001");
+                }
+            }
+        } catch (Throwable t) {
+            String message = "Cannot create JDBC driver of class '" +
+                    (driverClassName != null ? driverClassName : "") +
+                    "' for connect URL '" + url + "'";
+            logWriter.println(message);
+            t.printStackTrace(logWriter);
+            throw new SQLNestedException(message, t);
+        }
+
+        // Can't test without a validationQuery
+        if (validationQuery == null) {
+            setTestOnBorrow(false);
+            setTestOnReturn(false);
+            setTestWhileIdle(false);
+        }
+
+        // Set up the driver connection factory we will use
+        String user = username;
+        if (user != null) {
+            connectionProperties.put("user", user);
+        } else {
+            log("DBCP DataSource configured without a 'username'");
+        }
+
+        String pwd = password;
+        if (pwd != null) {
+            connectionProperties.put("password", pwd);
+        } else {
+            log("DBCP DataSource configured without a 'password'");
+        }
+
+        ConnectionFactory driverConnectionFactory = new 
DriverConnectionFactory(driver, url, connectionProperties);
+        return driverConnectionFactory;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/DatabaseRetryPredicate.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/util/db/DatabaseRetryPredicate.java 
b/core/src/main/java/org/apache/oozie/util/db/DatabaseRetryPredicate.java
new file mode 100644
index 0000000..c89aabe
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/db/DatabaseRetryPredicate.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util.db;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.base.Predicate;
+
+public abstract class DatabaseRetryPredicate implements Predicate<Throwable> {
+
+    @Override
+    public abstract boolean apply(Throwable input);
+
+    /*
+     * Helper method for subclasses to retrieve all exceptions in a set. "All 
exceptions" means the exception
+     * hierarchy that can be walked by calling getCause() repeatedly.
+     *
+     * Subclasses either check if a particular exception was raised or not, or 
need the SQLException to extract the
+     * error code. In both cases the exception has to be found.
+     */
+    protected Set<Class<?>> getAllExceptions(final Throwable t) {
+        final Set<Class<?>> exceptions = new HashSet<>();
+
+        exceptions.add(t.getClass());
+
+        Throwable ex = t;
+        while (ex.getCause() != null) {
+            exceptions.add(ex.getCause().getClass());
+            ex = ex.getCause();
+        }
+
+        return exceptions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java 
b/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java
new file mode 100644
index 0000000..0e31025
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/db/FailingConnectionWrapper.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util.db;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Sets;
+import org.apache.directory.api.util.Strings;
+import org.apache.oozie.util.XLog;
+
+import javax.annotation.Nullable;
+import javax.persistence.PersistenceException;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+public class FailingConnectionWrapper implements Connection {
+    private static final XLog LOG = 
XLog.getLog(FailingConnectionWrapper.class);
+
+    private final Connection delegate;
+    private static final RuntimeExceptionInjector<PersistenceException> 
injector =
+            new RuntimeExceptionInjector<>(PersistenceException.class, 5);
+    private static final OozieDmlStatementPredicate oozieDmlStatementPredicate 
=
+            new OozieDmlStatementPredicate();
+
+    public FailingConnectionWrapper(final Connection delegate) throws 
SQLException {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public Statement createStatement() throws SQLException {
+        return delegate.createStatement();
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(final String sql) throws 
SQLException {
+        return delegate.prepareStatement(sql);
+    }
+
+    @Override
+    public CallableStatement prepareCall(final String sql) throws SQLException 
{
+        return delegate.prepareCall(sql);
+    }
+
+    @Override
+    public String nativeSQL(final String sql) throws SQLException {
+        return delegate.nativeSQL(sql);
+    }
+
+    @Override
+    public void setAutoCommit(final boolean autoCommit) throws SQLException {
+        delegate.setAutoCommit(autoCommit);
+    }
+
+    @Override
+    public boolean getAutoCommit() throws SQLException {
+        return delegate.getAutoCommit();
+    }
+
+    @Override
+    public void commit() throws SQLException {
+        delegate.commit();
+    }
+
+    @Override
+    public void rollback() throws SQLException {
+        delegate.rollback();
+    }
+
+    @Override
+    public void close() throws SQLException {
+        delegate.close();
+    }
+
+    @Override
+    public boolean isClosed() throws SQLException {
+        return delegate.isClosed();
+    }
+
+    @Override
+    public DatabaseMetaData getMetaData() throws SQLException {
+        return delegate.getMetaData();
+    }
+
+    @Override
+    public void setReadOnly(final boolean readOnly) throws SQLException {
+        delegate.setReadOnly(readOnly);
+    }
+
+    @Override
+    public boolean isReadOnly() throws SQLException {
+        return delegate.isReadOnly();
+    }
+
+    @Override
+    public void setCatalog(final String catalog) throws SQLException {
+        delegate.setCatalog(catalog);
+    }
+
+    @Override
+    public String getCatalog() throws SQLException {
+        return delegate.getCatalog();
+    }
+
+    @Override
+    public void setTransactionIsolation(final int level) throws SQLException {
+        delegate.setTransactionIsolation(level);
+    }
+
+    @Override
+    public int getTransactionIsolation() throws SQLException {
+        return delegate.getTransactionIsolation();
+    }
+
+    @Override
+    public SQLWarning getWarnings() throws SQLException {
+        return delegate.getWarnings();
+    }
+
+    @Override
+    public void clearWarnings() throws SQLException {
+        delegate.clearWarnings();
+    }
+
+    @Override
+    public Statement createStatement(final int resultSetType, final int 
resultSetConcurrency) throws SQLException {
+        return delegate.createStatement(resultSetType, resultSetConcurrency);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(final String sql, final int 
resultSetType, final int resultSetConcurrency)
+            throws SQLException {
+        if (oozieDmlStatementPredicate.apply(sql)) {
+            LOG.trace("Injecting random failure. It's a DML statement of an 
Oozie table, preparing this statement might fail.");
+            injector.inject(String.format("Deliberately failing to prepare 
statement. [sql=%s]", sql));
+        }
+
+        LOG.trace("Preparing statement. [sql={0}]", sql);
+        return delegate.prepareStatement(sql, resultSetType, 
resultSetConcurrency);
+    }
+
+    @Override
+    public CallableStatement prepareCall(final String sql, final int 
resultSetType, final int resultSetConcurrency)
+            throws SQLException {
+        return delegate.prepareCall(sql, resultSetType, resultSetConcurrency);
+    }
+
+    @Override
+    public Map<String, Class<?>> getTypeMap() throws SQLException {
+        return delegate.getTypeMap();
+    }
+
+    @Override
+    public void setTypeMap(final Map<String, Class<?>> map) throws 
SQLException {
+        delegate.setTypeMap(map);
+    }
+
+    @Override
+    public void setHoldability(final int holdability) throws SQLException {
+        delegate.setHoldability(holdability);
+    }
+
+    @Override
+    public int getHoldability() throws SQLException {
+        return delegate.getHoldability();
+    }
+
+    @Override
+    public Savepoint setSavepoint() throws SQLException {
+        return delegate.setSavepoint();
+    }
+
+    @Override
+    public Savepoint setSavepoint(final String name) throws SQLException {
+        return delegate.setSavepoint(name);
+    }
+
+    @Override
+    public void rollback(final Savepoint savepoint) throws SQLException {
+        delegate.rollback();
+    }
+
+    @Override
+    public void releaseSavepoint(final Savepoint savepoint) throws 
SQLException {
+        delegate.releaseSavepoint(savepoint);
+    }
+
+    @Override
+    public Statement createStatement(final int resultSetType, final int 
resultSetConcurrency, final int resultSetHoldability)
+            throws SQLException {
+        return delegate.createStatement(resultSetType, resultSetConcurrency, 
resultSetHoldability);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(final String sql, final int 
resultSetType, final int resultSetConcurrency,
+                                              final int resultSetHoldability) 
throws SQLException {
+        return delegate.prepareStatement(sql, resultSetType, 
resultSetConcurrency, resultSetHoldability);
+    }
+
+    @Override
+    public CallableStatement prepareCall(final String sql, final int 
resultSetType, final int resultSetConcurrency,
+                                         final int resultSetHoldability) 
throws SQLException {
+        return delegate.prepareCall(sql, resultSetType, resultSetConcurrency, 
resultSetHoldability);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(final String sql, final int 
autoGeneratedKeys) throws SQLException {
+        return delegate.prepareStatement(sql, autoGeneratedKeys);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(final String sql, final int[] 
columnIndexes) throws SQLException {
+        return delegate.prepareStatement(sql, columnIndexes);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(final String sql, final String[] 
columnNames) throws SQLException {
+        return delegate.prepareStatement(sql, columnNames);
+    }
+
+    @Override
+    public Clob createClob() throws SQLException {
+        return delegate.createClob();
+    }
+
+    @Override
+    public Blob createBlob() throws SQLException {
+        return delegate.createBlob();
+    }
+
+    @Override
+    public NClob createNClob() throws SQLException {
+        return delegate.createNClob();
+    }
+
+    @Override
+    public SQLXML createSQLXML() throws SQLException {
+        return delegate.createSQLXML();
+    }
+
+    @Override
+    public boolean isValid(final int timeout) throws SQLException {
+        return delegate.isValid(timeout);
+    }
+
+    @Override
+    public void setClientInfo(final String name, final String value) throws 
SQLClientInfoException {
+        delegate.setClientInfo(name, value);
+    }
+
+    @Override
+    public void setClientInfo(final Properties properties) throws 
SQLClientInfoException {
+        delegate.setClientInfo(properties);
+    }
+
+    @Override
+    public String getClientInfo(final String name) throws SQLException {
+        return delegate.getClientInfo(name);
+    }
+
+    @Override
+    public Properties getClientInfo() throws SQLException {
+        return delegate.getClientInfo();
+    }
+
+    @Override
+    public Array createArrayOf(final String typeName, final Object[] elements) 
throws SQLException {
+        return delegate.createArrayOf(typeName, elements);
+    }
+
+    @Override
+    public Struct createStruct(final String typeName, final Object[] 
attributes) throws SQLException {
+        return delegate.createStruct(typeName, attributes);
+    }
+
+    @Override
+    public void setSchema(final String schema) throws SQLException {
+        delegate.setSchema(schema);
+    }
+
+    @Override
+    public String getSchema() throws SQLException {
+        return delegate.getSchema();
+    }
+
+    @Override
+    public void abort(final Executor executor) throws SQLException {
+        delegate.abort(executor);
+    }
+
+    @Override
+    public void setNetworkTimeout(final Executor executor, final int 
milliseconds) throws SQLException {
+        delegate.setNetworkTimeout(executor, milliseconds);
+    }
+
+    @Override
+    public int getNetworkTimeout() throws SQLException {
+        return delegate.getNetworkTimeout();
+    }
+
+    @Override
+    public <T> T unwrap(final Class<T> iface) throws SQLException {
+        return delegate.unwrap(iface);
+    }
+
+    @Override
+    public boolean isWrapperFor(final Class<?> iface) throws SQLException {
+        return delegate.isWrapperFor(iface);
+    }
+
+    static class OozieDmlStatementPredicate implements Predicate<String> {
+        private static final Set<String> DML_PREFIXES = Sets.newHashSet(
+                "SELECT ", "INSERT INTO ", "UPDATE ", "DELETE FROM ");
+        private static final Set<String> OOZIE_TABLE_NAMES = Sets.newHashSet(
+                "BUNDLE_ACTIONS", "BUNDLE_JOBS", "COORD_ACTIONS", 
"COORD_JOBS", "SLA_REGISTRATION", "SLA_SUMMARY",
+                "WF_ACTIONS", "WF_JOBS");
+
+        @Override
+        public boolean apply(@Nullable String input) {
+            Preconditions.checkArgument(Strings.isNotEmpty(input));
+
+            boolean isDmlStatement = false;
+            for (final String dmlPrefix : DML_PREFIXES) {
+                if (input.toUpperCase().startsWith(dmlPrefix)) {
+                    isDmlStatement = true;
+                }
+            }
+
+            boolean isOozieTable = false;
+            for (final String oozieTableName : OOZIE_TABLE_NAMES) {
+                if (input.toUpperCase().contains(oozieTableName)) {
+                    isOozieTable = true;
+                }
+            }
+
+            return isDmlStatement && isOozieTable;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java 
b/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java
new file mode 100644
index 0000000..fe9f08b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/util/db/FailingHSQLDBDriverWrapper.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util.db;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+public class FailingHSQLDBDriverWrapper extends org.hsqldb.jdbcDriver {
+
+    public static final String USE_FAILING_DRIVER = 
"oozie.sql.use.failing.driver";
+
+    public Connection connect(final String url,
+                              final Properties info) throws SQLException {
+        if (Boolean.parseBoolean(System.getProperty(USE_FAILING_DRIVER, 
"false"))) {
+            return new FailingConnectionWrapper(super.connect(url, info));
+        }
+
+        return super.connect(url, info);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java 
b/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java
new file mode 100644
index 0000000..f7af57c
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/db/FailingMySQLDriverWrapper.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util.db;
+
+import com.mysql.jdbc.Driver;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * Used for runtime database error injection on MySQL.
+ * <p/>
+ * Necessary steps:
+ * <ul>
+ *     <li>set {@code oozie.service.JPAService.connection.data.source} to
+ *     {@code org.apache.oozie.util.db.BasicDataSourceWrapper} within {@code 
oozie-site.xml}</li>
+ *     <li>set {@code oozie.service.JPAService.jdbc.driver} to {@code 
org.apache.oozie.util.db.FailingMySQLDriverWrapper}
+ *      within {@code oozie-site.xml}</li>
+ *     <li>restart Oozie server</li>
+ *     <li>submit / start some workflows, coordinators etc.</li>
+ *     <li>see any of those {@code JPAException} instances with following 
message prefix:
+ *     {@code Deliberately failing to prepare statement.}</li>
+ * </ul>
+ */
+public class FailingMySQLDriverWrapper extends Driver {
+    public FailingMySQLDriverWrapper() throws SQLException {
+        super();
+    }
+
+    public Connection connect(final String url,
+                              final Properties info) throws SQLException {
+        return new FailingConnectionWrapper(super.connect(url, info));
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/OperationRetryHandler.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/util/db/OperationRetryHandler.java 
b/core/src/main/java/org/apache/oozie/util/db/OperationRetryHandler.java
new file mode 100644
index 0000000..16a0a82
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/db/OperationRetryHandler.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util.db;
+
+import java.util.concurrent.Callable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.oozie.util.XLog;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+
+public class OperationRetryHandler {
+    private static XLog LOG = XLog.getLog(OperationRetryHandler.class);
+    @VisibleForTesting
+    static final RetryAttemptState RETRY_ATTEMPT_STATE = new 
RetryAttemptState();
+
+    private final int maxRetryCount;
+    private final long initialWaitTime;
+    private final long maxWaitTime;
+    private final Predicate<Throwable> retryPredicate;
+    private final boolean shouldRetry;
+
+    public OperationRetryHandler(final int maxRetryCount, final long 
initialWaitTime, final long maxWaitTime,
+                                 final Predicate<Throwable> retryPredicate) {
+        Preconditions.checkArgument(maxRetryCount >= 0, "Retry count must not 
be less than zero");
+        Preconditions.checkArgument(initialWaitTime > 0, "Initial wait time 
must be greater than zero");
+        Preconditions.checkArgument(maxWaitTime >= 0, "Maximum wait time must 
not be less than zero");
+
+        this.maxRetryCount = maxRetryCount;
+        this.initialWaitTime = initialWaitTime;
+        this.maxWaitTime = maxWaitTime;
+        this.retryPredicate = Preconditions.checkNotNull(retryPredicate, 
"Retry predicate must not be null");
+        this.shouldRetry = !(maxRetryCount == 0 || maxWaitTime == 0);
+
+        LOG.trace("Retry handler parameters are set." +
+                
"[maxRetryCount={0};initialWaitTime={1};maxWaitTime={2};retryPredicate.class={3};shouldRetry={4}]",
+                this.maxRetryCount, this.initialWaitTime, this.maxWaitTime, 
this.retryPredicate.getClass().getName(), shouldRetry);
+    }
+
+    public <V> V executeWithRetry(final Callable<V> operation) throws 
Exception {
+        int retries = 0;
+        long waitTime = initialWaitTime;
+        Exception lastException = null;
+
+        if (!shouldRetry) {
+            try {
+                LOG.trace("Configured not to retry, calling operation once.");
+
+                final V result = operation.call();
+
+                LOG.trace("Operation called once successfully.");
+
+                return result;
+            }
+            catch (final Exception e) {
+                LOG.error("An error occurred while calling the operation once. 
[e.message={0}]", e.getMessage());
+                throw e;
+            }
+        }
+
+        try {
+            RETRY_ATTEMPT_STATE.signalStart();
+
+            while (retries < maxRetryCount) {
+                try {
+                    LOG.trace("Calling operation. [retries={0}]", retries);
+
+                    retries++;
+                    final V result = operation.call();
+
+                    LOG.trace("Operation called successfully.");
+
+                    return result;
+                } catch (final Exception e) {
+                    LOG.warn("Database error", e);
+
+                    // if retries have been done by an inner retry handler,
+                    // then we won't make any effort to do it again
+                    if (RETRY_ATTEMPT_STATE.isExhausted()) {
+                        LOG.error("Retry attempts have been exhausted. 
[e.message={0}]", e.getMessage());
+                        throw e;
+                    }
+
+                    if (retryPredicate.apply(e)) {
+                        LOG.trace("Exception is not on blacklist, handling 
retry. [retries={0};e.class={1}]",
+                                retries, e.getClass().getName());
+                        waitTime = handleRetry(waitTime, retries);
+                        lastException = e;
+                    }
+                    else {
+                        LOG.warn("Exception is on blacklist, not handling 
retry. [retries={0};e.class={1}]",
+                                retries, e.getClass().getName());
+                        throw e;
+                    }
+                }
+            }
+
+            LOG.error("Number of maximum retry attempts exhausted");
+            RETRY_ATTEMPT_STATE.signalExhausted(); // signal to possible outer 
retry handlers
+            throw lastException;
+        } finally {
+            RETRY_ATTEMPT_STATE.signalEnd();
+        }
+    }
+
+    private long handleRetry(long sleepBeforeRetryMs, final int retries) 
throws InterruptedException {
+        LOG.warn("Operation failed, sleeping {0} milliseconds before retry 
#{1}", sleepBeforeRetryMs, retries);
+        Thread.sleep(sleepBeforeRetryMs);
+        sleepBeforeRetryMs *=  2;
+
+        return sleepBeforeRetryMs > maxWaitTime ? maxWaitTime : 
sleepBeforeRetryMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/PersistenceExceptionSubclassFilterRetryPredicate.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/util/db/PersistenceExceptionSubclassFilterRetryPredicate.java
 
b/core/src/main/java/org/apache/oozie/util/db/PersistenceExceptionSubclassFilterRetryPredicate.java
new file mode 100644
index 0000000..b742ca7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/util/db/PersistenceExceptionSubclassFilterRetryPredicate.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util.db;
+
+import com.google.common.collect.Sets;
+import org.apache.oozie.util.XLog;
+
+import javax.persistence.EntityExistsException;
+import javax.persistence.EntityNotFoundException;
+import javax.persistence.LockTimeoutException;
+import javax.persistence.NoResultException;
+import javax.persistence.NonUniqueResultException;
+import javax.persistence.OptimisticLockException;
+import javax.persistence.PersistenceException;
+import javax.persistence.PessimisticLockException;
+import javax.persistence.QueryTimeoutException;
+import javax.persistence.TransactionRequiredException;
+import java.util.Set;
+
+/**
+ * A {@link DatabaseRetryPredicate} which applies when a given {@link 
Exception} (or its causes) are NOT blacklisted.
+ * <p/>
+ * Blacklisted exceptions in this class do not indicate a network failure, 
therefore no retry should take place.
+ */
+public class PersistenceExceptionSubclassFilterRetryPredicate extends 
DatabaseRetryPredicate {
+    private static final XLog LOG = 
XLog.getLog(PersistenceExceptionSubclassFilterRetryPredicate.class);
+    private static final Set<Class<? extends PersistenceException>> BLACKLIST 
= Sets.newHashSet(
+            EntityExistsException.class,
+            EntityNotFoundException.class,
+            LockTimeoutException.class,
+            NoResultException.class,
+            NonUniqueResultException.class,
+            OptimisticLockException.class,
+            PessimisticLockException.class,
+            QueryTimeoutException.class,
+            TransactionRequiredException.class
+    );
+
+    @Override
+    public boolean apply(final Throwable throwable) {
+        LOG.trace("Retry predicate investigation started. 
[throwable.class={0}]", throwable.getClass().getName());
+
+        boolean applies = true;
+
+        for (final Class<?> classDownTheStackTrace : 
getAllExceptions(throwable)) {
+            for (final Class<? extends PersistenceException> blacklistElement 
: BLACKLIST) {
+                if (blacklistElement.isAssignableFrom(classDownTheStackTrace)) 
{
+                    applies = false;
+                }
+            }
+        }
+
+        LOG.trace("Retry predicate investigation finished. [applies={0}]", 
applies);
+
+        return applies;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/RetryAttemptState.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/db/RetryAttemptState.java 
b/core/src/main/java/org/apache/oozie/util/db/RetryAttemptState.java
new file mode 100644
index 0000000..0413ee0
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/db/RetryAttemptState.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util.db;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * This class tracks nested {@link OperationRetryHandler} calls. Some {@code 
JPAExecutor} implementations call other
+ * {@code JPAExecutor}s. This results in two (or possibly more) {@link 
OperationRetryHandler#executeWithRetry(Callable)} calls.
+ * <p/>
+ * If the innermost retry handler has exhausted all attempts and rethrows the 
{@link Exception}, the outer handler catches that
+ * and would restart the JPA operation again.
+ * <p/>
+ * In order to avoid this, retry handlers must communicate with each other on 
the same thread by incrementing / decrementing the
+ * nesting level and signalling whether the maximum number of attempts have 
been reached.
+ * <p/>
+ * We use {@link ThreadLocal}s because retry handlers might be called from 
different threads in parallel. If the nesting level is
+ * zero, it's important to reset the {@link #exhausted} back to {@code false} 
since this variable is reused in the
+ * thread pool.
+ */
+final class RetryAttemptState {
+    private final ThreadLocal<Boolean> exhausted = new ThreadLocal<Boolean>() {
+        @Override
+        protected Boolean initialValue() {
+            return Boolean.FALSE;
+        }
+    };
+
+    private final ThreadLocal<Integer> inProgressCount = new 
ThreadLocal<Integer>() {
+        @Override
+        protected Integer initialValue() {
+            return 0;
+        }
+    };
+
+    void signalStart() {
+        Preconditions.checkState(!isExhausted(), "retry attempts exhausted");
+
+        inProgressCount.set(inProgressCount.get() + 1);
+    }
+
+    void signalEnd() {
+        int currentLevel = inProgressCount.get() - 1;
+        inProgressCount.set(currentLevel);
+        if (currentLevel == 0) {
+            // state must be reset
+            exhausted.set(false);
+        }
+    }
+
+    void signalExhausted() {
+        exhausted.set(true);
+    }
+
+    boolean isExhausted() {
+        return exhausted.get();
+    }
+
+    @VisibleForTesting
+    int getInProgressCount() {
+        return inProgressCount.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/java/org/apache/oozie/util/db/RuntimeExceptionInjector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/util/db/RuntimeExceptionInjector.java 
b/core/src/main/java/org/apache/oozie/util/db/RuntimeExceptionInjector.java
new file mode 100644
index 0000000..2b55e85
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/db/RuntimeExceptionInjector.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util.db;
+
+import com.google.common.base.Preconditions;
+import org.apache.oozie.util.XLog;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class RuntimeExceptionInjector<E extends RuntimeException> {
+    private static final XLog LOG = 
XLog.getLog(RuntimeExceptionInjector.class);
+    private static final AtomicLong failureCounter = new AtomicLong(0);
+
+    private final Class<E> runtimeExceptionClass;
+    private final int failurePercent;
+
+    public RuntimeExceptionInjector(final Class<E> runtimeExceptionClass, 
final int failurePercent) {
+        Preconditions.checkArgument(failurePercent <= 100 && failurePercent >= 
0,
+                "illegal value for failure %: " + failurePercent);
+
+        this.runtimeExceptionClass = runtimeExceptionClass;
+        this.failurePercent = failurePercent;
+    }
+
+    public void inject(final String errorMessage) {
+        LOG.trace("Trying to inject random failure. [errorMessage={0}]", 
errorMessage);
+
+        final ThreadLocalRandom random = ThreadLocalRandom.current();
+        final int randomVal = random.nextInt(0, 100); // range:  [0..99]
+
+        if (randomVal < failurePercent) {
+            final long count = failureCounter.incrementAndGet();
+            LOG.warn("Injecting random failure. 
[runtimeExceptionClass.name={0};count={1};errorMessage={2}]",
+                    runtimeExceptionClass.getName(), count, errorMessage);
+            E injected;
+
+            try {
+                injected = 
runtimeExceptionClass.getConstructor(String.class).newInstance(
+                        "injected random failure #" + count + " ." + 
errorMessage);
+            } catch (final InstantiationException | IllegalAccessException | 
InvocationTargetException
+                    | NoSuchMethodException outer) {
+                try {
+                    LOG.warn("Instantiating without error message. 
[runtimeExceptionClass.name={0};outer.message={1}]",
+                            runtimeExceptionClass.getName(), 
outer.getMessage());
+                    injected = runtimeExceptionClass.newInstance();
+                } catch (final InstantiationException | IllegalAccessException 
inner) {
+                    LOG.error("Could not instantiate. 
[runtimeExceptionClass.name={0};inner.message={1}]",
+                            runtimeExceptionClass.getName(), 
inner.getMessage());
+                    throw new RuntimeException(inner);
+                }
+
+            }
+
+            throw injected;
+        }
+
+        LOG.trace("Did not inject random failure.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a54f7c20/core/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/META-INF/persistence.xml 
b/core/src/main/resources/META-INF/persistence.xml
index bad9278..a74078a 100644
--- a/core/src/main/resources/META-INF/persistence.xml
+++ b/core/src/main/resources/META-INF/persistence.xml
@@ -43,7 +43,7 @@
         <class>org.apache.oozie.util.db.ValidateConnectionBean</class>
 
         <properties>
-            <property name="openjpa.ConnectionDriverName" 
value="org.apache.commons.dbcp.BasicDataSource"/>
+            <property name="openjpa.ConnectionDriverName" 
value="org.apache.oozie.util.db.BasicDataSourceWrapper"/>
 
             <property name="openjpa.ConnectionProperties" 
value="**INVALID**"/> <!--Set by StoreService at init time -->
 
@@ -102,7 +102,7 @@
         <class>org.apache.oozie.util.db.ValidateConnectionBean</class>
 
         <properties>
-            <property name="openjpa.ConnectionDriverName" 
value="org.apache.commons.dbcp.BasicDataSource"/>
+            <property name="openjpa.ConnectionDriverName" 
value="org.apache.oozie.util.db.BasicDataSourceWrapper"/>
 
             <property name="openjpa.ConnectionProperties" 
value="**INVALID**"/> <!--Set by StoreService at init time -->
 
@@ -163,7 +163,7 @@
         <class>org.apache.oozie.util.db.ValidateConnectionBean</class>
 
         <properties>
-            <property name="openjpa.ConnectionDriverName" 
value="org.apache.commons.dbcp.BasicDataSource"/>
+            <property name="openjpa.ConnectionDriverName" 
value="org.apache.oozie.util.db.BasicDataSourceWrapper"/>
 
             <property name="openjpa.ConnectionProperties" 
value="**INVALID**"/> <!--Set by StoreService at init time -->
 
@@ -222,7 +222,7 @@
         <class>org.apache.oozie.util.db.ValidateConnectionBean</class>
 
         <properties>
-            <property name="openjpa.ConnectionDriverName" 
value="org.apache.commons.dbcp.BasicDataSource"/>
+            <property name="openjpa.ConnectionDriverName" 
value="org.apache.oozie.util.db.BasicDataSourceWrapper"/>
 
             <property name="openjpa.ConnectionProperties" 
value="**INVALID**"/> <!--Set by StoreService at init time -->
 
@@ -282,7 +282,7 @@
         <class>org.apache.oozie.util.db.ValidateConnectionBean</class>
 
         <properties>
-            <property name="openjpa.ConnectionDriverName" 
value="org.apache.commons.dbcp.BasicDataSource"/>
+            <property name="openjpa.ConnectionDriverName" 
value="org.apache.oozie.util.db.BasicDataSourceWrapper"/>
 
             <property name="openjpa.ConnectionProperties" 
value="**INVALID**"/> <!--Set by StoreService at init time -->
 
@@ -342,7 +342,7 @@
         <class>org.apache.oozie.util.db.ValidateConnectionBean</class>
 
         <properties>
-            <property name="openjpa.ConnectionDriverName" 
value="org.apache.commons.dbcp.BasicDataSource"/>
+            <property name="openjpa.ConnectionDriverName" 
value="org.apache.oozie.util.db.BasicDataSourceWrapper"/>
 
             <property name="openjpa.ConnectionProperties" 
value="**INVALID**"/> <!--Set by StoreService at init time -->
 

Reply via email to