Author: orudyy
Date: Wed Mar 25 10:57:13 2015
New Revision: 1669092
URL: http://svn.apache.org/r1669092
Log:
QPID-6464: Set replica consistency policy to 'NoConsistencyPolicy' in order to
avoid hanging for timeout specified in TimeConsistencyPolicy on creation of JE
transaction after transition from Master into Detached state when HA claster
has no majority but the remaining Master change configuration tasks attempted
to execute
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1669092&r1=1669091&r2=1669092&view=diff
==============================================================================
---
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
(original)
+++
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Wed Mar 25 10:57:13 2015
@@ -165,7 +165,14 @@ public class ReplicatedEnvironmentFacade
*/
put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
- put(ReplicationConfig.CONSISTENCY_POLICY, "TimeConsistencyPolicy(1
s,30 s)");
+ /**
+ * Allow Replica to proceed with transactions regardless of the state
of a Replica
+ * At the moment we do not read or write databases on Replicas.
+ * Setting consistency policy to NoConsistencyRequiredPolicy
+ * would allow to create transaction on Replica immediately.
+ * Any followed write operation would fail with ReplicaWriteException.
+ */
+ put(ReplicationConfig.CONSISTENCY_POLICY,
NoConsistencyRequiredPolicy.NAME);
}});
public static final String PERMITTED_NODE_LIST = "permittedNodes";
@@ -402,6 +409,14 @@ public class ReplicatedEnvironmentFacade
return new
ConnectionScopedRuntimeException(String.format("Environment '%s' cannot finish
JE operation because master is unknown", getNodeName()), dbe);
}
+ if (dbe instanceof ReplicaWriteException || dbe instanceof
ReplicaConsistencyException)
+ {
+ // Master transited into Detached/Replica but underlying
Configured Object has not been notified yet
+ // and attempted to perform JE operation.
+ // We need to abort any ongoing JE operation without halting
the Broker or VHN/VH
+ return new
ConnectionScopedRuntimeException(String.format("Environment '%s' cannot finish
JE operation because node is not master", getNodeName()), dbe);
+ }
+
boolean restart = (noMajority || dbe instanceof
RestartRequiredException);
if (restart)
{
Modified:
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1669092&r1=1669091&r2=1669092&view=diff
==============================================================================
---
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
(original)
+++
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
Wed Mar 25 10:57:13 2015
@@ -32,7 +32,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -48,6 +51,7 @@ import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.ReplicaWriteException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicatedEnvironment.State;
import com.sleepycat.je.rep.ReplicationConfig;
@@ -822,6 +826,68 @@ public class ReplicatedEnvironmentFacade
node2.close();
}
+ public void testReplicaTransactionBeginsImmediately() throws Exception
+ {
+ ReplicatedEnvironmentFacade master = createMaster();
+ String nodeName2 = TEST_NODE_NAME + "_2";
+ String host = "localhost";
+ int port = _portHelper.getNextAvailable();
+ String node2NodeHostPort = host + ":" + port;
+
+ final ReplicatedEnvironmentFacade replica = createReplica(nodeName2,
node2NodeHostPort, new NoopReplicationGroupListener() );
+
+ // close the master
+ master.close();
+
+ // try to create a transaction in a separate thread
+ // and make sure that transaction is created immediately.
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ try
+ {
+
+ Future<Transaction> future = service.submit(new
Callable<Transaction>(){
+
+ @Override
+ public Transaction call() throws Exception
+ {
+ return replica.getEnvironment().beginTransaction(null,
null);
+ }
+ });
+ Transaction transaction = future.get(5, TimeUnit.SECONDS);
+ assertNotNull("Transaction was not created during expected time",
transaction);
+ transaction.abort();
+ }
+ finally
+ {
+ service.shutdown();
+ }
+ }
+
+ public void
testReplicaWriteExceptionIsConvertedIntoConnectionScopedRuntimeException()
throws Exception
+ {
+ ReplicatedEnvironmentFacade master = createMaster();
+ String nodeName2 = TEST_NODE_NAME + "_2";
+ String host = "localhost";
+ int port = _portHelper.getNextAvailable();
+ String node2NodeHostPort = host + ":" + port;
+
+ final ReplicatedEnvironmentFacade replica = createReplica(nodeName2,
node2NodeHostPort, new NoopReplicationGroupListener() );
+
+ // close the master
+ master.close();
+
+ try
+ {
+ replica.openDatabase("test",
DatabaseConfig.DEFAULT.setAllowCreate(true) );
+ fail("Replica write operation should fail");
+ }
+ catch(ReplicaWriteException e)
+ {
+ RuntimeException handledException =
master.handleDatabaseException("test", e);
+ assertTrue("Unexpected exception", handledException instanceof
ConnectionScopedRuntimeException);
+ }
+ }
+
private void putRecord(final ReplicatedEnvironmentFacade master, final
Database db, final int keyValue,
final String dataValue)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]