http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitDisabledTest.java
----------------------------------------------------------------------
diff --git
a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitDisabledTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitDisabledTest.java
deleted file mode 100644
index e669f04..0000000
---
a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitDisabledTest.java
+++ /dev/null
@@ -1,1446 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache30;
-
-import com.gemstone.gemfire.SystemFailure;
-import com.gemstone.gemfire.cache.*;
-import com.gemstone.gemfire.cache.Region.Entry;
-import com.gemstone.gemfire.cache.util.*;
-import com.gemstone.gemfire.distributed.internal.*;
-import com.gemstone.gemfire.internal.tcp.Connection;
-import dunit.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Test to make sure slow receiver queuing is working
- *
- * @author darrel
- * @since 4.2.1
- */
-public class SlowRecDUnitDisabledTest extends CacheTestCase {
-
- public SlowRecDUnitDisabledTest(String name) {
- super(name);
- }
-
- // this test has special config of its distributed system so
- // the setUp and tearDown methods need to make sure we don't
- // use the ds from previous test and that we don't leave ours around
- // for the next test to use.
-
- public void setUp() throws Exception {
- try {
- disconnectAllFromDS();
- } finally {
- super.setUp();
- }
- }
- public void tearDown2() throws Exception {
- try {
- super.tearDown2();
- } finally {
- disconnectAllFromDS();
- }
- }
-
- ////////////////////// Test Methods //////////////////////
-
- private VM getOtherVm() {
- Host host = Host.getHost(0);
- return host.getVM(0);
- }
-
- static protected Object lastCallback = null;
-
- private void doCreateOtherVm(final Properties p, final boolean addListener) {
- VM vm = getOtherVm();
- vm.invoke(new CacheSerializableRunnable("create root") {
- public void run2() throws CacheException {
- getSystem(p);
- createAckRegion(true, false);
- AttributesFactory af = new AttributesFactory();
- af.setScope(Scope.DISTRIBUTED_NO_ACK);
- af.setDataPolicy(DataPolicy.REPLICATE);
- if (addListener) {
- CacheListener cl = new CacheListenerAdapter() {
- public void afterUpdate(EntryEvent event) {
- // make the slow receiver event slower!
- try {Thread.sleep(500);} catch (InterruptedException
shuttingDown) {fail("interrupted");}
- }
- };
- af.setCacheListener(cl);
- } else {
- CacheListener cl = new CacheListenerAdapter() {
- public void afterCreate(EntryEvent event) {
-// getLogWriter().info("afterCreate " + event.getKey());
- if (event.getCallbackArgument() != null) {
- lastCallback = event.getCallbackArgument();
- }
- if (event.getKey().equals("sleepkey")) {
- int sleepMs = ((Integer)event.getNewValue()).intValue();
-// getLogWriter().info("sleepkey sleeping for " + sleepMs);
- try {Thread.sleep(sleepMs);} catch (InterruptedException
ignore) {fail("interrupted");}
- }
- }
- public void afterUpdate(EntryEvent event) {
-// getLogWriter().info("afterUpdate " + event.getKey());
- if (event.getCallbackArgument() != null) {
- lastCallback = event.getCallbackArgument();
- }
- if (event.getKey().equals("sleepkey")) {
- int sleepMs = ((Integer)event.getNewValue()).intValue();
-// getLogWriter().info("sleepkey sleeping for " + sleepMs);
- try {Thread.sleep(sleepMs);} catch (InterruptedException
ignore) {fail("interrupted");}
- }
- }
- public void afterInvalidate(EntryEvent event) {
- if (event.getCallbackArgument() != null) {
- lastCallback = event.getCallbackArgument();
- }
- }
- public void afterDestroy(EntryEvent event) {
- if (event.getCallbackArgument() != null) {
- lastCallback = event.getCallbackArgument();
- }
- }
- };
- af.setCacheListener(cl);
- }
- Region r1 = createRootRegion("slowrec", af.create());
- // place holder so we receive updates
- r1.create("key", "value");
- }
- });
- }
- static protected final String CHECK_INVALID = "CHECK_INVALID";
-
- private void checkLastValueInOtherVm(final String lastValue, final Object
lcb) {
- VM vm = getOtherVm();
- vm.invoke(new CacheSerializableRunnable("check last value") {
- public void run2() throws CacheException {
- Region r1 = getRootRegion("slowrec");
- if (lcb != null) {
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return lcb.equals(lastCallback);
- }
- public String description() {
- return "waiting for callback";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
- assertEquals(lcb, lastCallback);
- }
- if (lastValue == null) {
- final Region r = r1;
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return r.getEntry("key") == null;
- }
- public String description() {
- return "waiting for key to become null";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
- assertEquals(null, r1.getEntry("key"));
- } else if (CHECK_INVALID.equals(lastValue)) {
- // should be invalid
- {
- final Region r = r1;
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- Entry e = r.getEntry("key");
- if (e == null) {
- return false;
- }
- return e.getValue() == null;
- }
- public String description() {
- return "waiting for invalidate";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
-// assertNotNull(re);
-// assertEquals(null, value);
- }
- } else {
- {
- int retryCount = 1000;
- Region.Entry re = null;
- Object value = null;
- while (retryCount-- > 0) {
- re = r1.getEntry("key");
- if (re != null) {
- value = re.getValue();
- if (value != null && value.equals(lastValue)) {
- break;
- }
- }
- try {Thread.sleep(50);} catch (InterruptedException ignore)
{fail("interrupted");}
- }
- assertNotNull(re);
- assertNotNull(value);
- assertEquals(lastValue, value);
- }
- }
- }
- });
- }
-
- private void forceQueueFlush() {
- Connection.FORCE_ASYNC_QUEUE=false;
- final DMStats stats = getSystem().getDistributionManager().getStats();
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return stats.getAsyncThreads() == 0;
- }
- public String description() {
- return "Waiting for async threads to disappear";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 10 * 1000, 200, true);
- }
-
- private void forceQueuing(final Region r) throws CacheException {
- Connection.FORCE_ASYNC_QUEUE=true;
- final DMStats stats = getSystem().getDistributionManager().getStats();
- r.put("forcekey", "forcevalue");
-
- // wait for the flusher to get its first flush in progress
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return stats.getAsyncQueueFlushesInProgress() != 0;
- }
- public String description() {
- return "waiting for flushes to start";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
- }
-
- /**
- * Make sure that noack puts to a receiver
- * will eventually queue and then catch up.
- */
- public void testNoAck() throws CacheException {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- final Region r = createRootRegion("slowrec", factory.create());
- final DMStats stats = getSystem().getDistributionManager().getStats();
-
- // create receiver in vm0 with queuing enabled
- Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "1");
- doCreateOtherVm(p, false);
-
- int repeatCount = 2;
- int count = 0;
- while (repeatCount-- > 0) {
- forceQueuing(r);
- final Object key = "key";
- long queuedMsgs = stats.getAsyncQueuedMsgs();
- long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-// long conflatedMsgs = stats.getAsyncConflatedMsgs();
- long queueSize = stats.getAsyncQueueSize();
- String lastValue = "";
- final long intialQueuedMsgs = queuedMsgs;
- long curQueuedMsgs = queuedMsgs - dequeuedMsgs;
- try {
- // loop while we still have queued the initially queued msgs
- // OR the cur # of queued msgs < 6
- while (dequeuedMsgs < intialQueuedMsgs || curQueuedMsgs <= 6) {
- String value = "count=" + count;
- lastValue = value;
- r.put(key, value);
- count ++;
- queueSize = stats.getAsyncQueueSize();
- queuedMsgs = stats.getAsyncQueuedMsgs();
- dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- curQueuedMsgs = queuedMsgs - dequeuedMsgs;
- }
- getLogWriter().info("After " + count + " " + " puts slowrec mode
kicked in by queuing " + queuedMsgs + " for a total size of " + queueSize);
- } finally {
- forceQueueFlush();
- }
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return stats.getAsyncQueueSize() == 0;
- }
- public String description() {
- return "Waiting for queues to empty";
- }
- };
- final long start = System.currentTimeMillis();
- DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
- final long finish = System.currentTimeMillis();
- getLogWriter().info("After " + (finish - start) + " ms async msgs where
flushed. A total of " + stats.getAsyncDequeuedMsgs() + " were flushed.
lastValue=" + lastValue);
-
- checkLastValueInOtherVm(lastValue, null);
- }
- }
- /**
- * Create a region named AckRegion with ACK scope
- */
- protected Region createAckRegion(boolean mirror, boolean conflate) throws
CacheException {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_ACK);
- if (mirror) {
- factory.setDataPolicy(DataPolicy.REPLICATE);
- }
- if (conflate) {
- factory.setEnableAsyncConflation(true);
- }
- final Region r = createRootRegion("AckRegion", factory.create());
- return r;
- }
- /**
- * Make sure that noack puts to a receiver
- * will eventually queue and then catch up with conflation
- */
- public void testNoAckConflation() throws CacheException {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- factory.setEnableAsyncConflation(true);
- final Region r = createRootRegion("slowrec", factory.create());
- final DMStats stats = getSystem().getDistributionManager().getStats();
-
- // create receiver in vm0 with queuing enabled
- Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "1");
- doCreateOtherVm(p, false);
-
- forceQueuing(r);
- final Object key = "key";
- int count = 0;
-// long queuedMsgs = stats.getAsyncQueuedMsgs();
-// long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- final long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
-// long queueSize = stats.getAsyncQueueSize();
- String lastValue = "";
- final long intialDeQueuedMsgs = stats.getAsyncDequeuedMsgs();
- long start = 0;
- try {
- while ((stats.getAsyncConflatedMsgs()-initialConflatedMsgs) < 1000) {
- String value = "count=" + count;
- lastValue = value;
- r.put(key, value);
- count ++;
- // getLogWriter().info("After " + count + " "
- // + " puts queueSize=" + queueSize
- // + " queuedMsgs=" + queuedMsgs
- // + " dequeuedMsgs=" + dequeuedMsgs
- // + " conflatedMsgs=" + conflatedMsgs);
- }
- start = System.currentTimeMillis();
- } finally {
- forceQueueFlush();
- }
-// queueSize = stats.getAsyncQueueSize();
-// queuedMsgs = stats.getAsyncQueuedMsgs();
-
-// getLogWriter().info("After " + count + " "
-// + " puts slowrec mode kicked in by queuing "
-// + queuedMsgs + " for a total size of " + queueSize
-// + " conflatedMsgs=" + conflatedMsgs
-// + " dequeuedMsgs=" + dequeuedMsgs);
-// final long start = System.currentTimeMillis();
-// while (stats.getAsyncQueuedMsgs() > stats.getAsyncDequeuedMsgs()) {
-// try {Thread.sleep(100);} catch (InterruptedException ignore) {}
-// queueSize = stats.getAsyncQueueSize();
-// queuedMsgs = stats.getAsyncQueuedMsgs();
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-// conflatedMsgs = stats.getAsyncConflatedMsgs();
-// getLogWriter().info("After sleeping"
-// + " queueSize=" + queueSize
-// + " queuedMsgs=" + queuedMsgs
-// + " dequeuedMsgs=" + dequeuedMsgs
-// + " conflatedMsgs=" + conflatedMsgs);
- final long finish = System.currentTimeMillis();
- getLogWriter().info("After " + (finish - start) + " ms async msgs where
flushed. A total of " + (stats.getAsyncDequeuedMsgs()-intialDeQueuedMsgs) + "
were flushed. Leaving a queue size of " + stats.getAsyncQueueSize() + ". The
lastValue was " + lastValue);
-
- checkLastValueInOtherVm(lastValue, null);
- }
- /**
- * make sure ack does not hang
- * make sure two ack updates do not conflate but are both queued
- */
- public void testAckConflation() throws CacheException {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- factory.setEnableAsyncConflation(true);
- final Region r = createRootRegion("slowrec", factory.create());
- final Region ar = createAckRegion(false, true);
- ar.create("ackKey", "ackValue");
-
- final DMStats stats = getSystem().getDistributionManager().getStats();
-
- // create receiver in vm0 with queuing enabled
- Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "2");
- doCreateOtherVm(p, false);
-
- forceQueuing(r);
- {
- // make sure ack does not hang
- // make sure two ack updates do not conflate but are both queued
- long startQueuedMsgs = stats.getAsyncQueuedMsgs();
- long startConflatedMsgs = stats.getAsyncConflatedMsgs();
- Thread t = new Thread(new Runnable() {
- public void run() {
- ar.put("ackKey", "ackValue");
- }
- });
- t.start();
- Thread t2 = new Thread(new Runnable() {
- public void run() {
- ar.put("ackKey", "ackValue");
- }
- });
- t2.start();
- // give threads a chance to get queued
- try {Thread.sleep(100);} catch (InterruptedException ignore)
{fail("interrupted");}
- forceQueueFlush();
- DistributedTestCase.join(t, 2 * 1000, getLogWriter());
- DistributedTestCase.join(t2, 2 * 1000, getLogWriter());
- long endQueuedMsgs = stats.getAsyncQueuedMsgs();
- long endConflatedMsgs = stats.getAsyncConflatedMsgs();
- assertEquals(startConflatedMsgs, endConflatedMsgs);
- // queue should be flushed by the time we get an ack
- assertEquals(endQueuedMsgs, stats.getAsyncDequeuedMsgs());
- assertEquals(startQueuedMsgs+2, endQueuedMsgs);
- }
- }
- /**
- * Make sure that only sequences of updates are conflated
- * Also checks that sending to a conflating region and non-conflating region
- * does the correct thing.
- * Test disabled because it intermittently fails due to race conditions
- * in test. This has been fixed in congo's tests. See bug 35357.
- */
- public void _disabled_testConflationSequence() throws CacheException {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- factory.setEnableAsyncConflation(true);
- final Region r = createRootRegion("slowrec", factory.create());
- factory.setEnableAsyncConflation(false);
- final Region noConflate = createRootRegion("noConflate", factory.create());
- final DMStats stats = getSystem().getDistributionManager().getStats();
-
- // create receiver in vm0 with queuing enabled
- Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "1");
- doCreateOtherVm(p, false);
- {
- VM vm = getOtherVm();
- vm.invoke(new CacheSerializableRunnable("create noConflate") {
- public void run2() throws CacheException {
- AttributesFactory af = new AttributesFactory();
- af.setScope(Scope.DISTRIBUTED_NO_ACK);
- af.setDataPolicy(DataPolicy.REPLICATE);
- createRootRegion("noConflate", af.create());
- }
- });
- }
-
- // now make sure update+destroy does not conflate
- final Object key = "key";
- getLogWriter().info("[testConflationSequence] about to force queuing");
- forceQueuing(r);
-
- int count = 0;
- String value = "";
- String lastValue = value;
- Object mylcb = null;
- long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
-// long initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
-// long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- int endCount = count+60;
-
- getLogWriter().info("[testConflationSequence] about to build up queue");
- long begin = System.currentTimeMillis();
- while (count < endCount) {
- value = "count=" + count;
- lastValue = value;
- r.create(key, value);
- count ++;
- value = "count=" + count;
- lastValue = value;
- r.put(key, value);
- count ++;
- mylcb = value;
- r.destroy(key, mylcb);
- count ++;
- lastValue = null;
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- assertTrue(System.currentTimeMillis() < begin+1000*60*2);
- }
- assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
- forceQueueFlush();
- checkLastValueInOtherVm(lastValue, mylcb);
-
- // now make sure create+update+localDestroy does not conflate
- getLogWriter().info("[testConflationSequence] force queuing
create-update-destroy");
- forceQueuing(r);
- initialConflatedMsgs = stats.getAsyncConflatedMsgs();
-// initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- endCount = count + 40;
-
- getLogWriter().info("[testConflationSequence] create-update-destroy");
- begin = System.currentTimeMillis();
- while (count < endCount) {
- value = "count=" + count;
- lastValue = value;
- r.create(key, value);
- count++;
- value = "count=" + count;
- lastValue = value;
- r.put(key, value);
- count ++;
- r.localDestroy(key);
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- assertTrue(System.currentTimeMillis() < begin+1000*60*2);
- }
- assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
- forceQueueFlush();
- checkLastValueInOtherVm(lastValue, null);
-
- // now make sure update+invalidate does not conflate
- getLogWriter().info("[testConflationSequence] force queuing
update-invalidate");
- forceQueuing(r);
- initialConflatedMsgs = stats.getAsyncConflatedMsgs();
-// initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
- value = "count=" + count;
- lastValue = value;
- r.create(key, value);
- count++;
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- endCount = count + 40;
-
- getLogWriter().info("[testConflationSequence] update-invalidate");
- begin = System.currentTimeMillis();
- while (count < endCount) {
- value = "count=" + count;
- lastValue = value;
- r.put(key, value);
- count ++;
- r.invalidate(key);
- count ++;
- lastValue = CHECK_INVALID;
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- assertTrue(System.currentTimeMillis() < begin+1000*60*2);
- }
- assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
- forceQueueFlush();
- getLogWriter().info("[testConflationSequence] assert other vm");
- checkLastValueInOtherVm(lastValue, null);
-
- r.destroy(key);
-
- // now make sure updates to a conflating region are conflated even while
- // updates to a non-conflating are not.
- getLogWriter().info("[testConflationSequence] conflate & no-conflate
regions");
- forceQueuing(r);
- final int initialAsyncSocketWrites = stats.getAsyncSocketWrites();
-// initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
-
- value = "count=" + count;
- lastValue = value;
- long conflatedMsgs = stats.getAsyncConflatedMsgs();
- long queuedMsgs = stats.getAsyncQueuedMsgs();
- r.create(key, value);
- queuedMsgs++;
- assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
- assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
- r.put(key, value);
- queuedMsgs++;
- assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
- assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
- noConflate.create(key, value);
- queuedMsgs++;
- assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
- assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
- noConflate.put(key, value);
- queuedMsgs++;
- assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
- assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
- count++;
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- endCount = count + 80;
-
- begin = System.currentTimeMillis();
- getLogWriter().info("[testConflationSequence:DEBUG] count=" + count
- + " queuedMsgs=" + stats.getAsyncQueuedMsgs()
- + " conflatedMsgs=" + stats.getAsyncConflatedMsgs()
- + " dequeuedMsgs=" + stats.getAsyncDequeuedMsgs()
- + " asyncSocketWrites=" + stats.getAsyncSocketWrites()
- );
- while (count < endCount) {
- // make sure we continue to have a flush in progress
- assertEquals(1, stats.getAsyncThreads());
- assertEquals(1, stats.getAsyncQueues());
- assertTrue(stats.getAsyncQueueFlushesInProgress() > 0);
- // make sure we are not completing any flushing while this loop is in
progress
- assertEquals(initialAsyncSocketWrites, stats.getAsyncSocketWrites());
- value = "count=" + count;
- lastValue = value;
- r.put(key, value);
- count ++;
- // make sure it was conflated and not queued
- assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
- conflatedMsgs++;
- assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
- noConflate.put(key, value);
- // make sure it was queued and not conflated
- queuedMsgs++;
- assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
- assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- assertTrue(System.currentTimeMillis() < begin+1000*60*2);
- }
-
- forceQueueFlush();
- getLogWriter().info("[testConflationSequence] assert other vm");
- checkLastValueInOtherVm(lastValue, null);
- }
- /**
- * Make sure that exceeding the queue size limit causes a disconnect.
- */
- public void testSizeDisconnect() throws CacheException {
- final String expected =
- "com.gemstone.gemfire.internal.tcp.ConnectionException: Forced
disconnect sent to" +
- "||java.io.IOException: Broken pipe";
- final String addExpected =
- "<ExpectedException action=add>" + expected + "</ExpectedException>";
- final String removeExpected =
- "<ExpectedException action=remove>" + expected + "</ExpectedException>";
-
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- final Region r = createRootRegion("slowrec", factory.create());
- final DM dm = getSystem().getDistributionManager();
- final DMStats stats = dm.getStats();
- // set others before vm0 connects
- final Set others = dm.getOtherDistributionManagerIds();
-
- // create receiver in vm0 with queuing enabled
- Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "5");
- p.setProperty("async-max-queue-size", "1"); // 1 meg
- doCreateOtherVm(p, false);
-
-
- final Object key = "key";
- final int VALUE_SIZE = 1024 * 100; // .1M async-max-queue-size should give
us 10 of these 100K msgs before queue full
- final byte[] value = new byte[VALUE_SIZE];
- int count = 0;
- forceQueuing(r);
- long queuedMsgs = stats.getAsyncQueuedMsgs();
- long queueSize = stats.getAsyncQueueSize();
-
- getCache().getLogger().info(addExpected);
- try {
- while (stats.getAsyncQueueSizeExceeded() == 0 &&
stats.getAsyncQueueTimeouts() == 0) {
- r.put(key, value);
- count ++;
- if (stats.getAsyncQueueSize() > 0) {
- queuedMsgs = stats.getAsyncQueuedMsgs();
- queueSize = stats.getAsyncQueueSize();
- }
- if (count > 100) {
- fail("should have exceeded max-queue-size by now");
- }
- }
- getLogWriter().info("After " + count + " " + VALUE_SIZE + " byte puts
slowrec mode kicked in but the queue filled when its size reached " + queueSize
+ " with " + queuedMsgs + " msgs");
- // make sure we lost a connection to vm0
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return dm.getOtherDistributionManagerIds().size() <= others.size()
- && stats.getAsyncQueueSize() == 0;
- }
- public String description() {
- return "waiting for connection loss";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
- }
- finally {
- forceQueueFlush();
- getCache().getLogger().info(removeExpected);
- }
- assertEquals(others, dm.getOtherDistributionManagerIds());
- assertEquals(0, stats.getAsyncQueueSize());
- }
- /**
- * Make sure that exceeding the async-queue-timeout causes a disconnect.<p>
- * [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled
- * in build.xml in the splitbrainNov07 branch. It had been disabled since
- * June 2006 due to hangs. Some of the tests, like this one, still need
- * work because the periodically (some quite often) fail.
- */
- public void donottestTimeoutDisconnect() throws CacheException {
- final String expected =
- "com.gemstone.gemfire.internal.tcp.ConnectionException: Forced
disconnect sent to" +
- "||java.io.IOException: Broken pipe";
- final String addExpected =
- "<ExpectedException action=add>" + expected + "</ExpectedException>";
- final String removeExpected =
- "<ExpectedException action=remove>" + expected + "</ExpectedException>";
-
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- final Region r = createRootRegion("slowrec", factory.create());
- final DM dm = getSystem().getDistributionManager();
- final DMStats stats = dm.getStats();
- // set others before vm0 connects
- final Set others = dm.getOtherDistributionManagerIds();
-
- // create receiver in vm0 with queuing enabled
- Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "5");
- p.setProperty("async-queue-timeout", "500"); // 500 ms
- doCreateOtherVm(p, true);
-
-
- final Object key = "key";
- final int VALUE_SIZE = 1024; // 1k
- final byte[] value = new byte[VALUE_SIZE];
- int count = 0;
- long queuedMsgs = stats.getAsyncQueuedMsgs();
- long queueSize = stats.getAsyncQueueSize();
- final long timeoutLimit = System.currentTimeMillis() + 5000;
-
- getCache().getLogger().info(addExpected);
- try {
- while (stats.getAsyncQueueTimeouts() == 0) {
- r.put(key, value);
- count ++;
- if (stats.getAsyncQueueSize() > 0) {
- queuedMsgs = stats.getAsyncQueuedMsgs();
- queueSize = stats.getAsyncQueueSize();
- }
- if (System.currentTimeMillis() > timeoutLimit) {
- fail("should have exceeded async-queue-timeout by now");
- }
- }
- getLogWriter().info("After " + count + " " + VALUE_SIZE + " byte puts
slowrec mode kicked in but the queue filled when its size reached " + queueSize
+ " with " + queuedMsgs + " msgs");
- // make sure we lost a connection to vm0
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- if (dm.getOtherDistributionManagerIds().size() > others.size()) {
- return false;
- }
- return stats.getAsyncQueueSize() == 0;
- }
- public String description() {
- return "waiting for departure";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
- }
- finally {
- getCache().getLogger().info(removeExpected);
- }
- assertEquals(others, dm.getOtherDistributionManagerIds());
- assertEquals(0, stats.getAsyncQueueSize());
- }
-
- // static helper methods ---------------------------------------------------
-
- private static final String KEY_SLEEP = "KEY_SLEEP";
- private static final String KEY_WAIT = "KEY_WAIT";
- private static final String KEY_DISCONNECT = "KEY_DISCONNECT";
-
- protected final static int CALLBACK_CREATE = 0;
- protected final static int CALLBACK_UPDATE = 1;
- protected final static int CALLBACK_INVALIDATE = 2;
- protected final static int CALLBACK_DESTROY = 3;
- protected final static int CALLBACK_REGION_INVALIDATE = 4;
-
- protected final static Integer CALLBACK_CREATE_INTEGER = new
Integer(CALLBACK_CREATE);
- protected final static Integer CALLBACK_UPDATE_INTEGER = new
Integer(CALLBACK_UPDATE);
- protected final static Integer CALLBACK_INVALIDATE_INTEGER = new
Integer(CALLBACK_INVALIDATE);
- protected final static Integer CALLBACK_DESTROY_INTEGER = new
Integer(CALLBACK_DESTROY);
- protected final static Integer CALLBACK_REGION_INVALIDATE_INTEGER = new
Integer(CALLBACK_REGION_INVALIDATE);
-
- private static class CallbackWrapper {
- public final Object callbackArgument;
- public final int callbackType;
- public CallbackWrapper(Object callbackArgument, int callbackType) {
- this.callbackArgument = callbackArgument;
- this.callbackType = callbackType;
- }
- public String toString() {
- return "CallbackWrapper: " + callbackArgument.toString() + " of type " +
callbackType;
- }
- }
-
- protected static class ControlListener extends CacheListenerAdapter {
- public final LinkedList callbackArguments = new LinkedList();
- public final LinkedList callbackTypes = new LinkedList();
- public final Object CONTROL_LOCK = new Object();
-
- public void afterCreate(EntryEvent event) {
- getLogWriter().info(event.getRegion().getName() + " afterCreate " +
event.getKey());
- synchronized(this.CONTROL_LOCK) {
- if (event.getCallbackArgument() != null) {
- this.callbackArguments.add(
- new CallbackWrapper(event.getCallbackArgument(), CALLBACK_CREATE));
- this.callbackTypes.add(CALLBACK_CREATE_INTEGER);
- this.CONTROL_LOCK.notifyAll();
- }
- }
- processEvent(event);
- }
- public void afterUpdate(EntryEvent event) {
- getLogWriter().info(event.getRegion().getName() + " afterUpdate " +
event.getKey());
- synchronized(this.CONTROL_LOCK) {
- if (event.getCallbackArgument() != null) {
- this.callbackArguments.add(
- new CallbackWrapper(event.getCallbackArgument(), CALLBACK_UPDATE));
- this.callbackTypes.add(CALLBACK_UPDATE_INTEGER);
- this.CONTROL_LOCK.notifyAll();
- }
- }
- processEvent(event);
- }
- public void afterInvalidate(EntryEvent event) {
- synchronized(this.CONTROL_LOCK) {
- if (event.getCallbackArgument() != null) {
- this.callbackArguments.add(
- new CallbackWrapper(event.getCallbackArgument(),
CALLBACK_INVALIDATE));
- this.callbackTypes.add(CALLBACK_INVALIDATE_INTEGER);
- this.CONTROL_LOCK.notifyAll();
- }
- }
- }
- public void afterDestroy(EntryEvent event) {
- synchronized(this.CONTROL_LOCK) {
- if (event.getCallbackArgument() != null) {
- this.callbackArguments.add(
- new CallbackWrapper(event.getCallbackArgument(),
CALLBACK_DESTROY));
- this.callbackTypes.add(CALLBACK_DESTROY_INTEGER);
- this.CONTROL_LOCK.notifyAll();
- }
- }
- }
- public void afterRegionInvalidate(RegionEvent event) {
- synchronized(this.CONTROL_LOCK) {
- if (event.getCallbackArgument() != null) {
- this.callbackArguments.add(
- new CallbackWrapper(event.getCallbackArgument(),
CALLBACK_REGION_INVALIDATE));
- this.callbackTypes.add(CALLBACK_REGION_INVALIDATE_INTEGER);
- this.CONTROL_LOCK.notifyAll();
- }
- }
- }
- private void processEvent(EntryEvent event) {
- if (event.getKey().equals(KEY_SLEEP)) {
- processSleep(event);
- }
- else if (event.getKey().equals(KEY_WAIT)) {
- processWait(event);
- }
- else if (event.getKey().equals(KEY_DISCONNECT)) {
- processDisconnect(event);
- }
- }
- private void processSleep(EntryEvent event) {
- int sleepMs = ((Integer)event.getNewValue()).intValue();
- getLogWriter().info("[processSleep] sleeping for " + sleepMs);
- try {
- Thread.sleep(sleepMs);
- } catch (InterruptedException ignore) {fail("interrupted");}
- }
- private void processWait(EntryEvent event) {
- int sleepMs = ((Integer)event.getNewValue()).intValue();
- getLogWriter().info("[processWait] waiting for " + sleepMs);
- synchronized(this.CONTROL_LOCK) {
- try {
- this.CONTROL_LOCK.wait(sleepMs);
- } catch (InterruptedException ignore) {return;}
- }
- }
- private void processDisconnect(EntryEvent event) {
- getLogWriter().info("[processDisconnect] disconnecting");
- disconnectFromDS();
- }
- };
-
- /**
- * Make sure a multiple no ack regions conflate properly.
- * [bruce] disabled when use of this dunit test class was reenabled in
- * the splitbrainNov07 branch. The class had been disabled since
- * June 2006 r13222 in the trunk. This test is failing because conflation
- * isn't kicking in for some reason.
- */
- public void donottestMultipleRegionConflation() throws Throwable {
- try {
- doTestMultipleRegionConflation();
- }
- catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- }
- catch (Throwable t) {
- getLogWriter().error("Encountered exception: ", t);
- throw t;
- }
- finally {
- // make sure other vm was notified even if test failed
- getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
- public void run() {
-
synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
-
doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
- }
- }
- });
- }
- }
- protected static ControlListener doTestMultipleRegionConflation_R1_Listener;
- protected static ControlListener doTestMultipleRegionConflation_R2_Listener;
- private void doTestMultipleRegionConflation() throws Exception {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- factory.setEnableAsyncConflation(true);
- final Region r1 = createRootRegion("slowrec1", factory.create());
- final Region r2 = createRootRegion("slowrec2", factory.create());
-
- assertTrue(getSystem().isConnected());
- assertNotNull(r1);
- assertFalse(r1.isDestroyed());
- assertNotNull(getCache());
- assertNotNull(getCache().getRegion("slowrec1"));
- assertNotNull(r2);
- assertFalse(r2.isDestroyed());
- assertNotNull(getCache());
- assertNotNull(getCache().getRegion("slowrec2"));
-
- final DM dm = getSystem().getDistributionManager();
- final Serializable controllerVM = dm.getDistributionManagerId();
- final DMStats stats = dm.getStats();
- final int millisToWait = 1000 * 60 * 5; // 5 minutes
-
- // set others before vm0 connects
- long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
-
- // create receiver in vm0 with queuing enabled
- final Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "5");
- p.setProperty("async-queue-timeout", "86400000"); // max value
- p.setProperty("async-max-queue-size", "1024"); // max value
-
- getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
- public void run2() throws CacheException {
- getSystem(p);
-
- DM dm = getSystem().getDistributionManager();
- assertTrue(dm.getDistributionManagerIds().contains(controllerVM));
-
- AttributesFactory af = new AttributesFactory();
- af.setScope(Scope.DISTRIBUTED_NO_ACK);
- af.setDataPolicy(DataPolicy.REPLICATE);
-
- doTestMultipleRegionConflation_R1_Listener = new ControlListener();
- af.setCacheListener(doTestMultipleRegionConflation_R1_Listener);
- createRootRegion("slowrec1", af.create());
-
- doTestMultipleRegionConflation_R2_Listener = new ControlListener();
- af.setCacheListener(doTestMultipleRegionConflation_R2_Listener);
- createRootRegion("slowrec2", af.create());
- }
- });
-
- // put vm0 cache listener into wait
- getLogWriter().info("[doTestMultipleRegionConflation] about to put vm0
into wait");
- r1.put(KEY_WAIT, new Integer(millisToWait));
-
- // build up queue size
- getLogWriter().info("[doTestMultipleRegionConflation] building up queue
size...");
- final Object key = "key";
- final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
- final int VALUE_SIZE = socketBufferSize*3;
- //final int VALUE_SIZE = 1024 * 1024 ; // 1 MB
- final byte[] value = new byte[VALUE_SIZE];
-
- int count = 0;
- while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
- count++;
- r1.put(key, value);
- }
-
- getLogWriter().info("[doTestMultipleRegionConflation] After " +
- count + " puts of size " + VALUE_SIZE +
- " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
-
- // put values that will be asserted
- final Object key1 = "key1";
- final Object key2 = "key2";
- Object putKey = key1;
- boolean flag = true;
- for (int i = 0; i < 30; i++) {
- if (i == 10) putKey = key2;
- if (flag) {
- if (i == 6) {
- r1.invalidate(putKey, new Integer(i));
- } else if (i == 24) {
- r1.invalidateRegion(new Integer(i));
- } else {
- r1.put(putKey, value, new Integer(i));
- }
- } else {
- if (i == 15) {
- r2.destroy(putKey, new Integer(i));
- } else {
- r2.put(putKey, value, new Integer(i));
- }
- }
- flag = !flag;
- }
-
- // r1: key1, 0, create
- // r1: key1, 4, update
- // r1: key1, 6, invalidate
- // r1: key1, 8, update
-
- // r1: key2, 10, create
- // r1: 24, invalidateRegion
- // r1: key2, 28, update
-
- // r2: key1, 1, create
- // r2: key1, 9, update
-
- // r2: key2, 11, create
- // r2: key2, 13, update
- // r2: key2, 15, destroy
- // r2: key2, 17, create
- // r2: key2, 29, update
-
- final int[] r1ExpectedArgs = new int[] { 0, 4, 6, 8, 10, 24, 28 };
- final int[] r1ExpectedTypes = new int[] /* 0, 1, 2, 1, 0, 4, 1 */
- { CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_INVALIDATE, CALLBACK_UPDATE,
- CALLBACK_CREATE, CALLBACK_REGION_INVALIDATE, CALLBACK_UPDATE };
-
- final int[] r2ExpectedArgs = new int[] { 1, 9, 11, 13, 15, 17, 29 };
- final int[] r2ExpectedTypes = new int[]
- { CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_CREATE, CALLBACK_UPDATE,
- CALLBACK_DESTROY, CALLBACK_CREATE, CALLBACK_UPDATE };
-
- // send notify to vm0
- getLogWriter().info("[doTestMultipleRegionConflation] wake up vm0");
- getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
- public void run() {
- synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
- doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
- }
- }
- });
-
- // wait for queue to be flushed
- getLogWriter().info("[doTestMultipleRegionConflation] wait for vm0");
- getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
- public void run() {
- try {
-
synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
- while
(doTestMultipleRegionConflation_R1_Listener.callbackArguments.size() <
r1ExpectedArgs.length) {
-
doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.wait(millisToWait);
- }
- }
-
synchronized(doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
- while
(doTestMultipleRegionConflation_R2_Listener.callbackArguments.size() <
r2ExpectedArgs.length) {
-
doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK.wait(millisToWait);
- }
- }
- } catch (InterruptedException ignore) {fail("interrupted");}
- }
- });
-
- // assert values on both listeners
- getLogWriter().info("[doTestMultipleRegionConflation] assert callback
arguments");
- getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
- public void run() {
- synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
-
getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackArguments="
+ doTestMultipleRegionConflation_R1_Listener.callbackArguments);
-
getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackTypes="
+ doTestMultipleRegionConflation_R1_Listener.callbackTypes);
-
assertEquals(doTestMultipleRegionConflation_R1_Listener.callbackArguments.size(),
-
doTestMultipleRegionConflation_R1_Listener.callbackTypes.size());
- int i = 0;
- for (Iterator iter =
doTestMultipleRegionConflation_R1_Listener.callbackArguments.iterator();
iter.hasNext();) {
- CallbackWrapper wrapper = (CallbackWrapper) iter.next();
- assertEquals(new Integer(r1ExpectedArgs[i]),
- wrapper.callbackArgument);
- assertEquals(new Integer(r1ExpectedTypes[i]),
- doTestMultipleRegionConflation_R1_Listener.callbackTypes.get(i));
- i++;
- }
- }
- synchronized(doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
-
getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackArguments="
+ doTestMultipleRegionConflation_R2_Listener.callbackArguments);
-
getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackTypes="
+ doTestMultipleRegionConflation_R2_Listener.callbackTypes);
-
assertEquals(doTestMultipleRegionConflation_R2_Listener.callbackArguments.size(),
-
doTestMultipleRegionConflation_R2_Listener.callbackTypes.size());
- int i = 0;
- for (Iterator iter =
doTestMultipleRegionConflation_R2_Listener.callbackArguments.iterator();
iter.hasNext();) {
- CallbackWrapper wrapper = (CallbackWrapper) iter.next();
- assertEquals(new Integer(r2ExpectedArgs[i]),
- wrapper.callbackArgument);
- assertEquals(new Integer(r2ExpectedTypes[i]),
- doTestMultipleRegionConflation_R2_Listener.callbackTypes.get(i));
- i++;
- }
- }
- }
- });
- }
-
- /**
- * Make sure a disconnect causes queue memory to be released.
- */
- public void testDisconnectCleanup() throws Throwable {
- try {
- doTestDisconnectCleanup();
- }
- catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- }
- catch (Throwable t) {
- getLogWriter().error("Encountered exception: ", t);
- throw t;
- }
- finally {
- // make sure other vm was notified even if test failed
- getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
- public void run() {
- synchronized(doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
- doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
- }
- }
- });
- }
- }
- protected static ControlListener doTestDisconnectCleanup_Listener;
- private void doTestDisconnectCleanup() throws Exception {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- final Region r = createRootRegion("slowrec", factory.create());
- final DM dm = getSystem().getDistributionManager();
- final DMStats stats = dm.getStats();
- // set others before vm0 connects
- final Set others = dm.getOtherDistributionManagerIds();
- long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
- final int initialQueues = stats.getAsyncQueues();
-
- // create receiver in vm0 with queuing enabled
- final Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "5");
- p.setProperty("async-queue-timeout", "86400000"); // max value
- p.setProperty("async-max-queue-size", "1024"); // max value
-
- getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
- public void run2() throws CacheException {
- getSystem(p);
- AttributesFactory af = new AttributesFactory();
- af.setScope(Scope.DISTRIBUTED_NO_ACK);
- af.setDataPolicy(DataPolicy.REPLICATE);
-
- doTestDisconnectCleanup_Listener = new ControlListener();
- af.setCacheListener(doTestDisconnectCleanup_Listener);
- createRootRegion("slowrec", af.create());
- }
- });
-
- // put vm0 cache listener into wait
- getLogWriter().info("[testDisconnectCleanup] about to put vm0 into wait");
- int millisToWait = 1000 * 60 * 5; // 5 minutes
- r.put(KEY_WAIT, new Integer(millisToWait));
- r.put(KEY_DISCONNECT, KEY_DISCONNECT);
-
- // build up queue size
- getLogWriter().info("[testDisconnectCleanup] building up queue size...");
- final Object key = "key";
- final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
- final int VALUE_SIZE = socketBufferSize*3;
- //final int VALUE_SIZE = 1024 * 1024 ; // 1 MB
- final byte[] value = new byte[VALUE_SIZE];
-
- int count = 0;
- final long abortMillis = System.currentTimeMillis() + millisToWait;
- while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
- count++;
- r.put(key, value);
- assertFalse(System.currentTimeMillis() >= abortMillis);
- }
-
- getLogWriter().info("[testDisconnectCleanup] After " +
- count + " puts of size " + VALUE_SIZE +
- " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
-
- while (stats.getAsyncQueuedMsgs() < 10 ||
- stats.getAsyncQueueSize() < VALUE_SIZE*10) {
- count++;
- r.put(key, value);
- assertFalse(System.currentTimeMillis() >= abortMillis);
- }
- assertTrue(stats.getAsyncQueuedMsgs() >= 10);
-
- while (stats.getAsyncQueues() < 1) {
- pause(100);
- assertFalse(System.currentTimeMillis() >= abortMillis);
- }
-
- getLogWriter().info("[testDisconnectCleanup] After " +
- count + " puts of size " + VALUE_SIZE + " queue size has reached " +
- stats.getAsyncQueueSize() + " bytes and number of queues is " +
- stats.getAsyncQueues() + ".");
-
- assertTrue(stats.getAsyncQueueSize() >= (VALUE_SIZE*5));
- assertEquals(initialQueues+1, stats.getAsyncQueues());
-
- // assert vm0 is still connected
- assertTrue(dm.getOtherDistributionManagerIds().size() > others.size());
-
- // send notify to vm0
- getLogWriter().info("[testDisconnectCleanup] wake up vm0");
- getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
- public void run() {
- synchronized(doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
- doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
- }
- }
- });
-
- // make sure we lost a connection to vm0
- getLogWriter().info("[testDisconnectCleanup] wait for vm0 to disconnect");
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return dm.getOtherDistributionManagerIds().size() <= others.size();
- }
- public String description() {
- return "waiting for disconnect";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
- assertEquals(others, dm.getOtherDistributionManagerIds());
-
- // check free memory... perform wait loop with System.gc
- getLogWriter().info("[testDisconnectCleanup] wait for queue cleanup");
- ev = new WaitCriterion() {
- public boolean done() {
- if (stats.getAsyncQueues() <= initialQueues) {
- return true;
- }
- Runtime.getRuntime().gc();
- return false;
- }
- public String description() {
- return "waiting for queue cleanup";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
-// getLogWriter().info("[testDisconnectCleanup] initialQueues=" +
-// initialQueues + " asyncQueues=" + stats.getAsyncQueues());
- assertEquals(initialQueues, stats.getAsyncQueues());
- }
-
- /**
- * Make sure a disconnect causes queue memory to be released.<p>
- * [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled
- * in build.xml in the splitbrainNov07 branch. It had been disabled since
- * June 2006 due to hangs. Some of the tests, like this one, still need
- * work because the periodically (some quite often) fail.
- */
- public void donottestPartialMessage() throws Throwable {
- try {
- doTestPartialMessage();
- }
- catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- }
- catch (Throwable t) {
- getLogWriter().error("Encountered exception: ", t);
- throw t;
- }
- finally {
- // make sure other vm was notified even if test failed
- getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
- public void run() {
- synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
- doTestPartialMessage_Listener.CONTROL_LOCK.notifyAll();
- }
- }
- });
- }
- }
- protected static ControlListener doTestPartialMessage_Listener;
- private void doTestPartialMessage() throws Exception {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- factory.setEnableAsyncConflation(true);
- final Region r = createRootRegion("slowrec", factory.create());
- final DM dm = getSystem().getDistributionManager();
- final DMStats stats = dm.getStats();
-
- // set others before vm0 connects
-// final Set others = dm.getOtherDistributionManagerIds();
- long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
-// int initialQueues = stats.getAsyncQueues();
-
- // create receiver in vm0 with queuing enabled
- final Properties p = new Properties();
- p.setProperty("async-distribution-timeout", String.valueOf(1000*4)); // 4
sec
- p.setProperty("async-queue-timeout", "86400000"); // max value
- p.setProperty("async-max-queue-size", "1024"); // max value
-
- getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
- public void run2() throws CacheException {
- getSystem(p);
- AttributesFactory af = new AttributesFactory();
- af.setScope(Scope.DISTRIBUTED_NO_ACK);
- af.setDataPolicy(DataPolicy.REPLICATE);
-
- doTestPartialMessage_Listener = new ControlListener();
- af.setCacheListener(doTestPartialMessage_Listener);
- createRootRegion("slowrec", af.create());
- }
- });
-
- // put vm0 cache listener into wait
- getLogWriter().info("[testPartialMessage] about to put vm0 into wait");
- final int millisToWait = 1000 * 60 * 5; // 5 minutes
- r.put(KEY_WAIT, new Integer(millisToWait));
-
- // build up queue size
- getLogWriter().info("[testPartialMessage] building up queue size...");
- final Object key = "key";
- final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
- final int VALUE_SIZE = socketBufferSize*3;
- //1024 * 20; // 20 KB
- final byte[] value = new byte[VALUE_SIZE];
-
- int count = 0;
- while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
- count++;
- r.put(key, value, new Integer(count));
- }
-
- final int partialId = count;
- assertEquals(0, stats.getAsyncConflatedMsgs());
-
- getLogWriter().info("[testPartialMessage] After " +
- count + " puts of size " + VALUE_SIZE +
- " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
-
- pause(2000);
-
- // conflate 10 times
- while (stats.getAsyncConflatedMsgs() < 10) {
- count++;
- r.put(key, value, new Integer(count));
- if (count == partialId+1) {
-// long begin = System.currentTimeMillis();
-// while (stats.getAsyncQueues() < 1) {
-// pause(100);
-// assertFalse(System.currentTimeMillis() > begin+1000*10);
-// }
- assertEquals(initialQueuedMsgs+2, stats.getAsyncQueuedMsgs());
- assertEquals(0, stats.getAsyncConflatedMsgs());
- } else if (count == partialId+2) {
- assertEquals(initialQueuedMsgs+2, stats.getAsyncQueuedMsgs());
- assertEquals(1, stats.getAsyncConflatedMsgs());
- }
- }
-
- final int conflateId = count;
-
- final int[] expectedArgs = { partialId, conflateId };
-
- // send notify to vm0
- getLogWriter().info("[testPartialMessage] wake up vm0");
- getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
- public void run() {
- synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
- doTestPartialMessage_Listener.CONTROL_LOCK.notify();
- }
- }
- });
-
- // wait for queue to be flushed
- getLogWriter().info("[testPartialMessage] wait for vm0");
- getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
- public void run() {
- try {
- synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
- boolean done = false;
- while (!done) {
- if (doTestPartialMessage_Listener.callbackArguments.size()> 0) {
- CallbackWrapper last = (CallbackWrapper)
- doTestPartialMessage_Listener.callbackArguments.getLast();
- Integer lastId = (Integer) last.callbackArgument;
- if (lastId.intValue() == conflateId) {
- done = true;
- } else {
-
doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
- }
- } else {
- doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
- }
- }
- }
- } catch (InterruptedException ignore) {fail("interrupted");}
- }
- });
-
- // assert values on both listeners
- getLogWriter().info("[testPartialMessage] assert callback arguments");
- getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
- public void run() {
- synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
- getLogWriter().info("[testPartialMessage] " +
- "doTestPartialMessage_Listener.callbackArguments=" +
- doTestPartialMessage_Listener.callbackArguments);
-
- assertEquals(doTestPartialMessage_Listener.callbackArguments.size(),
- doTestPartialMessage_Listener.callbackTypes.size());
-
- int i = 0;
- Iterator argIter =
- doTestPartialMessage_Listener.callbackArguments.iterator();
- Iterator typeIter =
- doTestPartialMessage_Listener.callbackTypes.iterator();
-
- while (argIter.hasNext()) {
- CallbackWrapper wrapper = (CallbackWrapper) argIter.next();
- Integer arg = (Integer) wrapper.callbackArgument;
- typeIter.next(); // Integer type
- if (arg.intValue() < partialId) {
- continue;
- }
- assertEquals(new Integer(expectedArgs[i]), arg);
- //assertEquals(CALLBACK_UPDATE_INTEGER, type);
- i++;
- }
- }
- }
- });
-
- }
-}
-