Repository: incubator-geode Updated Branches: refs/heads/develop 16d09f659 -> 2deb31d95
GEODE-1160 TransactionWriter is triggered if updating entries with using PDX This inhibits invocation of transaction writers and listners for operations on internal cache Regions. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/2deb31d9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/2deb31d9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/2deb31d9 Branch: refs/heads/develop Commit: 2deb31d95f1f123bd1091f5f527ba0de1e175741 Parents: 16d09f6 Author: Bruce Schuchardt <bschucha...@pivotal.io> Authored: Mon Apr 4 08:47:39 2016 -0700 Committer: Bruce Schuchardt <bschucha...@pivotal.io> Committed: Mon Apr 4 08:49:23 2016 -0700 ---------------------------------------------------------------------- .../gemfire/internal/cache/TXCommitMessage.java | 18 ++-- .../gemfire/internal/cache/TXEvent.java | 21 ++++ .../gemfire/internal/cache/TXRmtEvent.java | 28 +++++ .../gemfire/internal/cache/TXState.java | 12 ++- .../gemfire/pdx/PdxSerializableDUnitTest.java | 106 +++++++++++++++++++ 5 files changed, 174 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2deb31d9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java index 1a4d377..d1644c7 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java @@ -699,7 +699,9 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member /* * We need to make sure that we should fire a TX afterCommit event. */ - if(!disableListeners && (forceListener || (txEvent!=null && txEvent.getEvents().size()>0))) { + boolean internalEvent = (txEvent != null && txEvent.hasOnlyInternalEvents()); + if (!disableListeners && !internalEvent + && (forceListener || (txEvent!=null && !txEvent.isEmpty()))) { for (int i=0; i < tls.length; i++) { try { tls[i].afterCommit(txEvent); @@ -2457,13 +2459,13 @@ private static final long serialVersionUID = 589384721273797822L; } - /** - * Disable firing of TX Listeners. Currently on used on clients. - * @param b disable the listeners - */ - public void setDisableListeners(boolean b) { - disableListeners = true; - } + /** + * Disable firing of TX Listeners. Currently on used on clients. + * @param b disable the listeners + */ + public void setDisableListeners(boolean b) { + disableListeners = true; + } public Version getClientVersion() { return clientVersion; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2deb31d9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java index e686cea..6bfd26f 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java @@ -128,6 +128,27 @@ public class TXEvent implements TransactionEvent, Releasable { return this.events; } + /** + * Do all operations touch internal regions? + * Returns false if the transaction is empty + * or if any events touch non-internal regions. + */ + public boolean hasOnlyInternalEvents() { + List<CacheEvent<?,?>> txevents = getEvents(); + if (txevents == null || txevents.isEmpty()) { + return false; + } + for (CacheEvent<?,?> txevent: txevents) { + LocalRegion region = (LocalRegion)txevent.getRegion(); + if (region != null + && !region.isPdxTypesRegion() + && !region.isInternalRegion()) { + return false; + } + } + return true; + } + public final Cache getCache() { return this.cache; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2deb31d9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java index b378c8e..c0493ac 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java @@ -83,6 +83,30 @@ public class TXRmtEvent implements TransactionEvent } } } + + /** + * Do all operations touch internal regions? + * Returns false if the transaction is empty + * or if any events touch non-internal regions. + */ + public boolean hasOnlyInternalEvents() { + if (events == null || events.isEmpty()) { + return false; + } + Iterator<CacheEvent<?,?>> it = this.events.iterator(); + while (it.hasNext()) { + CacheEvent<?,?> event = it.next(); + if (isEventUserVisible(event)) { + LocalRegion region = (LocalRegion)event.getRegion(); + if (region != null + && !region.isPdxTypesRegion() + && !region.isInternalRegion()) { + return false; + } + } + } + return true; + } public List getCreateEvents() { @@ -175,6 +199,10 @@ public class TXRmtEvent implements TransactionEvent } } } + + public boolean isEmpty() { + return (events == null) || events.isEmpty(); + } private EntryEventImpl createEvent(LocalRegion r, Operation op, RegionEntry re, Object key, Object newValue,Object aCallbackArgument) http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2deb31d9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java index 9e8dd18..3bec397 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java @@ -388,7 +388,10 @@ public class TXState implements TXStateInterface { if(!firedWriter && writer!=null) { try { firedWriter = true; - writer.beforeCommit(getEvent()); + TXEvent event = getEvent(); + if (!event.hasOnlyInternalEvents()) { + writer.beforeCommit(event); + } } catch(TransactionWriterException twe) { cleanup(); throw new CommitConflictException(twe); @@ -917,7 +920,10 @@ public class TXState implements TXStateInterface { try { // need to mark this so we don't fire again in commit firedWriter = true; - writer.beforeCommit(getEvent()); + TXEvent event = getEvent(); + if (!event.hasOnlyInternalEvents()) { + writer.beforeCommit(event); + } } catch(TransactionWriterException twe) { cleanup(); throw new CommitConflictException(twe); @@ -1623,7 +1629,7 @@ public class TXState implements TXStateInterface { public boolean isFireCallbacks() { - return true; + return !getEvent().hasOnlyInternalEvents(); } public boolean isOriginRemoteForEvents() { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2deb31d9/geode-core/src/test/java/com/gemstone/gemfire/pdx/PdxSerializableDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/pdx/PdxSerializableDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/pdx/PdxSerializableDUnitTest.java index 1e901bc..3621016 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/pdx/PdxSerializableDUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/pdx/PdxSerializableDUnitTest.java @@ -16,13 +16,23 @@ */ package com.gemstone.gemfire.pdx; +import java.util.List; + import com.gemstone.gemfire.cache.AttributesFactory; import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheEvent; import com.gemstone.gemfire.cache.CacheFactory; import com.gemstone.gemfire.cache.DataPolicy; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.cache.TransactionEvent; +import com.gemstone.gemfire.cache.TransactionListener; +import com.gemstone.gemfire.cache.TransactionWriter; +import com.gemstone.gemfire.cache.TransactionWriterException; import com.gemstone.gemfire.cache30.CacheTestCase; +import com.gemstone.gemfire.cache30.TestTransactionListener; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.cache.TXEvent; import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration; import com.gemstone.gemfire.test.dunit.AsyncInvocation; import com.gemstone.gemfire.test.dunit.Host; @@ -83,6 +93,48 @@ public class PdxSerializableDUnitTest extends CacheTestCase { }); } + public void testTransactionCallbacksNotInvoked() { + Host host = Host.getHost(0); + VM vm1 = host.getVM(0); + VM vm2 = host.getVM(1); + + SerializableCallable createRegionAndAddPoisonedListener = new SerializableCallable() { + public Object call() throws Exception { + AttributesFactory af = new AttributesFactory(); + af.setScope(Scope.DISTRIBUTED_ACK); + af.setDataPolicy(DataPolicy.REPLICATE); + createRootRegion("testSimplePdx", af.create()); + addPoisonedTransactionListeners(); + return null; + } + }; + + vm1.invoke(createRegionAndAddPoisonedListener); + vm2.invoke(createRegionAndAddPoisonedListener); + vm1.invoke(new SerializableCallable() { + public Object call() throws Exception { + //Check to make sure the type region is not yet created + Region r = getRootRegion("testSimplePdx"); + Cache mycache = getCache(); + mycache.getCacheTransactionManager().begin(); + r.put(1, new SimpleClass(57, (byte) 3)); + mycache.getCacheTransactionManager().commit(); + //Ok, now the type registry should exist + assertNotNull(getRootRegion(PeerTypeRegistration.REGION_NAME)); + return null; + } + }); + vm2.invoke(new SerializableCallable() { + public Object call() throws Exception { + //Ok, now the type registry should exist + assertNotNull(getRootRegion(PeerTypeRegistration.REGION_NAME)); + Region r = getRootRegion("testSimplePdx"); + assertEquals(new SimpleClass(57, (byte) 3), r.get(1)); + return null; + } + }); + } + public void testPersistenceDefaultDiskStore() throws Throwable { SerializableCallable createRegion = new SerializableCallable() { @@ -187,4 +239,58 @@ public class PdxSerializableDUnitTest extends CacheTestCase { vm3.invoke(checkForObject); } + + /** + * add a listener and writer that will throw an exception when invoked + * if events are for internal regions + */ + public final void addPoisonedTransactionListeners() { + MyTestTransactionListener listener = new MyTestTransactionListener(); + getCache().getCacheTransactionManager().addListener(listener); + getCache().getCacheTransactionManager().setWriter(listener); + } + + + static private class MyTestTransactionListener + implements TransactionWriter, TransactionListener { + private MyTestTransactionListener() { + + } + + private void checkEvent(TransactionEvent event) { + List<CacheEvent<?,?>> events = event.getEvents(); + System.out.println("MyTestTransactionListener.checkEvent: events are " + events); + for (CacheEvent<?,?> cacheEvent: events) { + if (((LocalRegion)cacheEvent.getRegion()).isPdxTypesRegion()) { + throw new UnsupportedOperationException("found internal event: " + cacheEvent + + " region=" + cacheEvent.getRegion().getName()); + } + } + } + + @Override + public void beforeCommit(TransactionEvent event) + throws TransactionWriterException { + checkEvent(event); + } + + @Override + public void close() { + } + + @Override + public void afterCommit(TransactionEvent event) { + checkEvent(event); + } + + @Override + public void afterFailedCommit(TransactionEvent event) { + checkEvent(event); + } + + @Override + public void afterRollback(TransactionEvent event) { + checkEvent(event); + } + } }