This is an automated email from the ASF dual-hosted git repository. mkevo pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 4cb75ae484 GEODE-10395 remove locks from List if dlock.acquireTryLocks return false (#7846) 4cb75ae484 is described below commit 4cb75ae4848250606db2f4b14300601755586192 Author: Mario Kevo <48509719+mk...@users.noreply.github.com> AuthorDate: Tue Sep 20 19:04:08 2022 +0200 GEODE-10395 remove locks from List if dlock.acquireTryLocks return false (#7846) --- .../internal/cache/locks/TXLockServiceImpl.java | 26 ++++++-- .../internal/StartupMessageJUnitTest.java | 4 +- .../cache/locks/TXLockServiceImplTest.java | 71 ++++++++++++++++++++++ 3 files changed, 95 insertions(+), 6 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java index f7e7aebe0f..44b9c9f440 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.logging.log4j.Logger; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CommitConflictException; import org.apache.geode.distributed.internal.InternalDistributedSystem; @@ -53,7 +54,7 @@ public class TXLockServiceImpl extends TXLockService { /** * List of active txLockIds */ - protected List txLockIdList = new ArrayList(); + protected final List<TXLockId> txLockIdList = new ArrayList<>(); /** * True if grantor recovery is in progress; used to keep <code>release</code> from waiting for @@ -70,6 +71,14 @@ public class TXLockServiceImpl extends TXLockService { /** The distributed system for cancellation checks. */ private final InternalDistributedSystem system; + @VisibleForTesting + TXLockServiceImpl(InternalDistributedSystem sys, StoppableReentrantReadWriteLock recoveryLock, + DLockService dlock) { + system = sys; + this.recoveryLock = recoveryLock; + this.dlock = dlock; + } + TXLockServiceImpl(String name, InternalDistributedSystem sys) { if (sys == null) { throw new IllegalStateException( @@ -129,10 +138,16 @@ public class TXLockServiceImpl extends TXLockService { if (gotLocks) { // ...otherwise race can occur between tryLocks and readLock acquireRecoveryReadLock(); } else if (keyIfFail[0] != null) { + synchronized (txLockIdList) { + txLockIdList.remove(txLockId); + } throw new CommitConflictException( String.format("Concurrent transaction commit detected %s", keyIfFail[0])); } else { + synchronized (txLockIdList) { + txLockIdList.remove(txLockId); + } throw new CommitConflictException( String.format("Failed to request try locks from grantor: %s", dlock.getLockGrantorId())); @@ -225,9 +240,7 @@ public class TXLockServiceImpl extends TXLockService { txLockId)); } - dlock.releaseTryLocks(txLockId, () -> { - return recovering; - }); + dlock.releaseTryLocks(txLockId, () -> recovering); txLockIdList.remove(txLockId); releaseRecoveryReadLock(); @@ -277,4 +290,9 @@ public class TXLockServiceImpl extends TXLockService { dlock.destroyAndRemove(); } + @VisibleForTesting + public int getTxLockIdList() { + return this.txLockIdList.size(); + } + } diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/StartupMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/StartupMessageJUnitTest.java index d017b96ad9..b51453a3a4 100644 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/StartupMessageJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/StartupMessageJUnitTest.java @@ -95,7 +95,7 @@ public class StartupMessageJUnitTest { startupMessage.process(distributionManager); assertThat( - startupMessage.getProcessorType() == OperationExecutors.WAITING_POOL_EXECUTOR); + startupMessage.getProcessorType()).isEqualTo(OperationExecutors.WAITING_POOL_EXECUTOR); } @Test @@ -111,6 +111,6 @@ public class StartupMessageJUnitTest { assertThat( startupResponseMessage - .getProcessorType() == OperationExecutors.WAITING_POOL_EXECUTOR); + .getProcessorType()).isEqualTo(OperationExecutors.WAITING_POOL_EXECUTOR); } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceImplTest.java new file mode 100644 index 0000000000..e4f8a7b01b --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceImplTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.locks; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.cache.CommitConflictException; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.locks.DLockService; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock; + +public class TXLockServiceImplTest { + private TXLockServiceImpl txLockService; + private InternalDistributedSystem internalDistributedSystem; + private DLockService dlock; + private List distLocks; + private Set otherMembers; + private DistributionManager distributionManager; + private InternalDistributedMember distributedMember; + private StoppableReentrantReadWriteLock recoverylock; + + @Before + public void setUp() { + internalDistributedSystem = mock(InternalDistributedSystem.class); + dlock = mock(DLockService.class); + distributionManager = mock(DistributionManager.class); + distributedMember = mock(InternalDistributedMember.class); + recoverylock = mock(StoppableReentrantReadWriteLock.class); + } + + @Test + public void testTxLockService() { + distLocks = new ArrayList(); + txLockService = new TXLockServiceImpl(internalDistributedSystem, recoverylock, dlock); + + when(dlock.getDistributionManager()).thenReturn(distributionManager); + when(dlock.getDistributionManager().getId()).thenReturn(distributedMember); + + assertThat((txLockService).getTxLockIdList()).isEqualTo(0); + + assertThrows(CommitConflictException.class, + () -> txLockService.txLock(distLocks, otherMembers)); + + assertThat((txLockService).getTxLockIdList()).isEqualTo(0); + } +}