This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new e775d2c GEODE-6320: Only save client transactions in failoverMap on far side. (#3122) e775d2c is described below commit e775d2ccb71562c9a49d2e6343244c3c9e37a4b7 Author: pivotal-eshu <e...@pivotal.io> AuthorDate: Fri Jan 25 08:42:49 2019 -0800 GEODE-6320: Only save client transactions in failoverMap on far side. (#3122) --- .../TransactionCommitOnFarSideDistributedTest.java | 181 +++++++++++++++++++++ .../geode/internal/cache/TXCommitMessage.java | 12 +- .../geode/internal/cache/TXFarSideCMTracker.java | 6 + .../apache/geode/internal/cache/TXManagerImpl.java | 5 + 4 files changed, 198 insertions(+), 6 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TransactionCommitOnFarSideDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TransactionCommitOnFarSideDistributedTest.java new file mode 100644 index 0000000..0b23377 --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TransactionCommitOnFarSideDistributedTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.apache.geode.test.dunit.VM.getHostName; +import static org.apache.geode.test.dunit.VM.getVM; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.CacheTransactionManager; +import org.apache.geode.cache.PartitionAttributes; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.PoolFactory; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.cache.client.internal.PoolImpl; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.ClientCacheRule; +import org.apache.geode.test.dunit.rules.DistributedRule; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; + +public class TransactionCommitOnFarSideDistributedTest implements Serializable { + private String hostName; + private String uniqueName; + private String regionName; + private VM server1; + private VM server2; + private VM server3; + private VM server4; + private int port1; + private final String key = "key"; + private final String value = "value"; + private final String newValue = "newValue"; + + @Rule + public DistributedRule distributedRule = new DistributedRule(); + + @Rule + public CacheRule cacheRule = new CacheRule(); + + @Rule + public ClientCacheRule clientCacheRule = new ClientCacheRule(); + + @Rule + public SerializableTestName testName = new SerializableTestName(); + + @Before + public void setUp() { + server1 = getVM(0); + server2 = getVM(1); + server3 = getVM(2); + server4 = getVM(3); + + hostName = getHostName(); + uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName(); + regionName = uniqueName + "_region"; + } + + @Test + public void farSideFailoverMapDoesNotSaveTransactionsInitiatedFromServer() { + port1 = server1.invoke(() -> createServerRegion(1, false, 2)); + server1.invoke(() -> { + Region region = cacheRule.getCache().getRegion(regionName); + region.put(key, value); + }); + server2.invoke(() -> createServerRegion(1, false, 2)); + server3.invoke(() -> createServerRegion(1, false, 2)); + + server1.invoke(() -> { + TXManagerImpl txManager = cacheRule.getCache().getTxManager(); + txManager.begin(); + Region region = cacheRule.getCache().getRegion(regionName); + region.put(key, newValue); + txManager.commit(); + assertThat(region.get(key)).isEqualTo(newValue); + assertThat(txManager.getFailoverMapSize()).isZero(); + }); + server2.invoke(() -> verifyFarSideFailoverMapSizeAfterCommit(0)); + server3.invoke(() -> verifyFarSideFailoverMapSizeAfterCommit(0)); + } + + private void verifyFarSideFailoverMapSizeAfterCommit(int expectedValue) { + Region region = cacheRule.getCache().getRegion(regionName); + assertThat(region.get(key)).isEqualTo(newValue); + assertThat(TXCommitMessage.getTracker().getFailoverMapSize()).isEqualTo(expectedValue); + } + + @Test + public void farSideFailoverMapSavesTransactionsInitiatedFromClient() { + VM client = server4; + port1 = server1.invoke(() -> createServerRegion(1, false, 2)); + server1.invoke(() -> { + Region region = cacheRule.getCache().getRegion(regionName); + region.put(key, value); + }); + server2.invoke(() -> createServerRegion(1, false, 2)); + server3.invoke(() -> createServerRegion(1, false, 2)); + + client.invoke(() -> createClientRegion(true, port1)); + client.invoke(() -> { + CacheTransactionManager transactionManager = + clientCacheRule.getClientCache().getCacheTransactionManager(); + transactionManager.begin(); + Region region = clientCacheRule.getClientCache().getRegion(regionName); + region.put(key, newValue); + transactionManager.commit(); + }); + + server1.invoke(() -> { + TXManagerImpl txManager = cacheRule.getCache().getTxManager(); + Region region = cacheRule.getCache().getRegion(regionName); + assertThat(region.get(key)).isEqualTo(newValue); + assertThat(txManager.getFailoverMapSize()).isEqualTo(1); + }); + server2.invoke(() -> verifyFarSideFailoverMapSizeAfterCommit(1)); + server3.invoke(() -> verifyFarSideFailoverMapSizeAfterCommit(1)); + } + + private int createServerRegion(int totalNumBuckets, boolean isAccessor, int redundancy) + throws Exception { + PartitionAttributesFactory factory = new PartitionAttributesFactory(); + factory.setTotalNumBuckets(totalNumBuckets).setRedundantCopies(redundancy); + if (isAccessor) { + factory.setLocalMaxMemory(0); + } + PartitionAttributes partitionAttributes = factory.create(); + cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION) + .setPartitionAttributes(partitionAttributes).create(regionName); + + TXManagerImpl txManager = cacheRule.getCache().getTxManager(); + txManager.setTransactionTimeToLiveForTest(4); + + CacheServer server = cacheRule.getCache().addCacheServer(); + server.setPort(0); + server.start(); + return server.getPort(); + } + + private void createClientRegion(boolean connectToFirstPort, int... ports) { + clientCacheRule.createClientCache(); + + PoolImpl pool = getPool(ports); + ClientRegionFactory crf = + clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL); + crf.setPoolName(pool.getName()); + crf.create(regionName); + + } + + private PoolImpl getPool(int... ports) { + PoolFactory factory = PoolManager.createFactory(); + for (int port : ports) { + factory.addServer(hostName, port); + } + factory.setReadTimeout(12000).setSocketBufferSize(1000); + + return (PoolImpl) factory.create(uniqueName); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java index 4b8d1d6..79e55ca 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java @@ -108,9 +108,9 @@ public class TXCommitMessage extends PooledDistributionMessage * List of operations to do when processing this tx. Valid on farside only. */ protected transient ArrayList farSideEntryOps; - private transient byte[] farsideBaseMembershipId; // only available on farside - private transient long farsideBaseThreadId; // only available on farside - private transient long farsideBaseSequenceId; // only available on farside + private byte[] farsideBaseMembershipId; // only available on farside + private long farsideBaseThreadId; // only available on farside + private long farsideBaseSequenceId; // only available on farside /** * (Nearside) true of any regions in this TX have required roles @@ -118,11 +118,11 @@ public class TXCommitMessage extends PooledDistributionMessage private transient boolean hasReliableRegions = false; /** - * Set of all caching exceptions produced hile processing this tx + * Set of all caching exceptions produced while processing this tx */ private transient Set processingExceptions = Collections.emptySet(); - private transient ClientProxyMembershipID bridgeContext = null; + private ClientProxyMembershipID bridgeContext = null; /** * Version of the client that this TXCommitMessage is being sent to. Used for backwards @@ -687,7 +687,7 @@ public class TXCommitMessage extends PooledDistributionMessage if (isAckRequired()) { ack(); } - if (!dm.getExistingCache().isClient()) { + if (!dm.getExistingCache().isClient() && bridgeContext != null) { getTracker().saveTXForClientFailover(txIdent, this); } if (logger.isDebugEnabled()) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java index d8a9757..0fb2404 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.logging.log4j.Logger; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.MembershipListener; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; @@ -360,4 +361,9 @@ public class TXFarSideCMTracker { this.lastHistoryItem = 0; Arrays.fill(this.txHistory, null); } + + @VisibleForTesting + public int getFailoverMapSize() { + return failoverMap.size(); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java index 45794fb..fef7831 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java @@ -1959,4 +1959,9 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene return scheduledToBeRemovedTx; } + @VisibleForTesting + public int getFailoverMapSize() { + return failoverMap.size(); + } + }