This is an automated email from the ASF dual-hosted git repository. jasonhuynh pushed a commit to branch support/1.13 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 4721b164501bba82309041522172276ed3042f4a Author: Jianxia Chen <[email protected]> AuthorDate: Fri Jul 10 14:42:01 2020 -0700 GEODE-8200: Rebalance operations stuck in "IN_PROGRESS" state forever (#5350) Record the locator that issues the original Rest API request. If the locator is offline afterwards, report an error. Co-authored-by: Jianxia Chen <[email protected]> Co-authored-by: Jinmei Liao <[email protected]> (cherry picked from commit 426b9de66ae9adadde8f1994b2340fc9de809d81) --- .../management/OperationManagementUpgradeTest.java | 123 +++++++++++++++++++++ .../api/LocatorClusterManagementService.java | 3 +- .../operation/OperationHistoryManager.java | 35 +++++- .../internal/operation/OperationManager.java | 3 +- .../internal/operation/OperationState.java | 17 ++- .../internal/operation/OperationStateStore.java | 2 +- .../operation/RegionOperationStateStore.java | 4 +- .../sanctioned-geode-core-serializables.txt | 2 +- .../api/LocatorClusterManagementServiceTest.java | 32 ++++++ .../operation/OperationHistoryManagerTest.java | 35 +++++- .../internal/operation/OperationManagerTest.java | 30 +++-- .../internal/operation/OperationStateTest.java | 8 ++ .../operation/RegionOperationStateStoreTest.java | 6 +- 13 files changed, 262 insertions(+), 38 deletions(-) diff --git a/geode-assembly/src/upgradeTest/java/org/apache/geode/management/OperationManagementUpgradeTest.java b/geode-assembly/src/upgradeTest/java/org/apache/geode/management/OperationManagementUpgradeTest.java new file mode 100644 index 0000000..6121a50 --- /dev/null +++ b/geode-assembly/src/upgradeTest/java/org/apache/geode/management/OperationManagementUpgradeTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.management; + +import static org.apache.geode.test.dunit.Host.getHost; +import static org.apache.geode.test.junit.rules.gfsh.GfshRule.startLocatorCommand; +import static org.apache.geode.test.junit.rules.gfsh.GfshRule.startServerCommand; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collection; +import java.util.List; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.management.api.ClusterManagementOperationResult; +import org.apache.geode.management.api.ClusterManagementResult; +import org.apache.geode.management.api.ClusterManagementService; +import org.apache.geode.management.client.ClusterManagementServiceBuilder; +import org.apache.geode.management.operation.RebalanceOperation; +import org.apache.geode.management.runtime.RebalanceResult; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.internal.DUnitLauncher; +import org.apache.geode.test.junit.categories.BackwardCompatibilityTest; +import org.apache.geode.test.junit.rules.gfsh.GfshExecution; +import org.apache.geode.test.junit.rules.gfsh.GfshRule; +import org.apache.geode.test.junit.rules.gfsh.GfshScript; +import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory; +import org.apache.geode.test.version.TestVersion; +import org.apache.geode.test.version.VersionManager; + +@Category({BackwardCompatibilityTest.class}) +@RunWith(Parameterized.class) [email protected](CategoryWithParameterizedRunnerFactory.class) +public class OperationManagementUpgradeTest { + private final String oldVersion; + private VM vm; + + @Parameterized.Parameters(name = "{0}") + public static Collection<String> data() { + List<String> result = VersionManager.getInstance().getVersionsWithoutCurrent(); + result.removeIf(s -> TestVersion.compare(s, "1.13.0") < 0); + return result; + } + + public OperationManagementUpgradeTest(String version) { + oldVersion = version; + oldGfsh = new GfshRule(oldVersion); + DUnitLauncher.launchIfNeeded(false); + // get the vm with the same version of the oldGfsh + vm = getHost(0).getVM(oldVersion, 0); + } + + @Rule + public GfshRule oldGfsh; + + @Rule + public GfshRule gfsh = new GfshRule(); + + @Test + public void newLocatorCanReadOldConfigurationData() { + int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(7); + int locatorPort1 = ports[0]; + int jmxPort1 = ports[1]; + int httpPort1 = ports[2]; + int locatorPort2 = ports[3]; + int jmxPort2 = ports[4]; + int httpPort2 = ports[5]; + int serverPort = ports[6]; + GfshExecution execute = + GfshScript.of(startLocatorCommand("locator1", locatorPort1, jmxPort1, httpPort1, 0)) + .and(startLocatorCommand("locator2", locatorPort2, jmxPort2, httpPort2, locatorPort1)) + .and(startServerCommand("server", serverPort, locatorPort1)) + .execute(oldGfsh); + + String operationId = vm.invoke(() -> { + // start a cms client that connects to locator1's http port + ClusterManagementService cms = new ClusterManagementServiceBuilder() + .setHost("localhost") + .setPort(httpPort1) + .build(); + + ClusterManagementOperationResult<RebalanceOperation, RebalanceResult> startResult = + cms.start(new RebalanceOperation()); + assertThat(startResult.getStatusCode()) + .isEqualTo(ClusterManagementResult.StatusCode.ACCEPTED); + return startResult.getOperationId(); + }); + + // stop locator1 + oldGfsh.stopLocator(execute, "locator1"); + // use new gfsh to start locator1, make sure new locator can start + GfshScript.of(startLocatorCommand("locator1", locatorPort1, jmxPort1, httpPort1, locatorPort2)) + .execute(gfsh, execute.getWorkingDir()); + + // use the new cms client + ClusterManagementService cms = new ClusterManagementServiceBuilder() + .setHost("localhost") + .setPort(httpPort1) + .build(); + ClusterManagementOperationResult<RebalanceOperation, RebalanceResult> operationResult = + cms.get(new RebalanceOperation(), operationId); + System.out.println(operationResult); + assertThat(operationResult.getStatusCode()).isEqualTo(ClusterManagementResult.StatusCode.OK); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java b/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java index f20bc36..5256359 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java @@ -122,7 +122,7 @@ public class LocatorClusterManagementService implements ClusterManagementService this(cache, persistenceService, new ConcurrentHashMap<>(), new ConcurrentHashMap<>(), new MemberValidator(cache, persistenceService), new CommonConfigurationValidator(), new OperationManager(cache, - new OperationHistoryManager(new RegionOperationStateStore(cache)))); + new OperationHistoryManager(new RegionOperationStateStore(cache), cache))); // initialize the list of managers managers.put(Region.class, new RegionConfigManager(persistenceService)); managers.put(Pdx.class, new PdxManager(persistenceService)); @@ -484,6 +484,7 @@ public class LocatorClusterManagementService implements ClusterManagementService if (operationState == null) { raise(StatusCode.ENTITY_NOT_FOUND, "Operation '" + opId + "' does not exist."); } + return toClusterManagementOperationResult(operationState); } diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationHistoryManager.java b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationHistoryManager.java index 5f09252..5873ee2 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationHistoryManager.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationHistoryManager.java @@ -22,6 +22,7 @@ import java.util.stream.Collectors; import org.apache.geode.annotations.Experimental; import org.apache.geode.annotations.VisibleForTesting; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.management.api.ClusterManagementOperation; import org.apache.geode.management.runtime.OperationResult; @@ -35,22 +36,25 @@ import org.apache.geode.management.runtime.OperationResult; public class OperationHistoryManager { private final long keepCompletedMillis; private final OperationStateStore operationStateStore; + private final InternalCache cache; /** - * set a default retention policy to keep results for 2 hours after completion + * set a default retention policy to keep results for 7 days after completion */ public OperationHistoryManager( - OperationStateStore operationStateStore) { - this(Duration.ofDays(7), operationStateStore); + OperationStateStore operationStateStore, InternalCache cache) { + this(Duration.ofDays(7), operationStateStore, cache); } /** * set a custom retention policy to keep results for X amount of time after completion */ public OperationHistoryManager(Duration keepCompleted, - OperationStateStore operationStateStore) { + OperationStateStore operationStateStore, + InternalCache cache) { keepCompletedMillis = keepCompleted.toMillis(); this.operationStateStore = operationStateStore; + this.cache = cache; } /** @@ -69,6 +73,7 @@ public class OperationHistoryManager { final long expirationTime = now() - keepCompletedMillis; Set<String> expiredKeys = operationStateStore.list() .stream() + .map(this::validateLocator) .filter(operationInstance -> isExpired(expirationTime, operationInstance)) .map(OperationState::getId) .collect(Collectors.toSet()); @@ -90,13 +95,31 @@ public class OperationHistoryManager { return operationEnd.getTime() <= expirationTime; } + private OperationState<?, ?> validateLocator(OperationState<?, ?> operationState) { + if (isLocatorOffline(operationState)) { + operationState.setOperationEnd(new Date(), null, + new RuntimeException("Locator that initiated the Rest API operation is offline: " + + operationState.getLocator())); + } + + return operationState; + } + + private boolean isLocatorOffline(OperationState operationState) { + return operationState.getOperationEnd() == null + && (operationState.getLocator() != null) + && !cache.getMyId().toString().equals(operationState.getLocator()) + && (!cache.getDistributedSystem().getAllOtherMembers().stream().map(Object::toString) + .collect(Collectors.toSet()).contains(operationState.getLocator())); + } + /** * Stores a new operation in the history and returns its unique identifier. */ - public String recordStart(ClusterManagementOperation<?> op) { + public String recordStart(ClusterManagementOperation<?> op, String locator) { expireHistory(); - return operationStateStore.recordStart(op); + return operationStateStore.recordStart(op, locator); } /** diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationManager.java b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationManager.java index c5c7027..6dd1260 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationManager.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationManager.java @@ -63,12 +63,11 @@ public class OperationManager implements AutoCloseable { op.getClass().getSimpleName())); } - String opId = historyManager.recordStart(op); + String opId = historyManager.recordStart(op, cache.getMyId().toString()); // get the operationState BEFORE we start the async thread // so that start will return a result that is not influenced // by how far the async thread gets in its execution. OperationState<A, V> operationState = historyManager.get(opId); - CompletableFuture.supplyAsync(() -> performer.perform(cache, op), executor) .whenComplete((result, exception) -> { Throwable cause = exception == null ? null : exception.getCause(); diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationState.java b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationState.java index a7cb8ba..4c944b5 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationState.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationState.java @@ -28,12 +28,25 @@ import org.apache.geode.management.runtime.OperationResult; */ public class OperationState<A extends ClusterManagementOperation<V>, V extends OperationResult> implements Identifiable<String> { + private static final long serialVersionUID = 8212319653561969588L; private final String opId; private final A operation; private final Date operationStart; private Date operationEnd; private V result; private Throwable throwable; + private String locator; + + public String getLocator() { + return this.locator; + } + + public void setLocator( + String locator) { + synchronized (this) { + this.locator = locator; + } + } @Override public boolean equals(Object o) { @@ -49,7 +62,8 @@ public class OperationState<A extends ClusterManagementOperation<V>, V extends O Objects.equals(getOperationStart(), that.getOperationStart()) && Objects.equals(getOperationEnd(), that.getOperationEnd()) && Objects.equals(getResult(), that.getResult()) && - Objects.equals(getThrowable(), that.getThrowable()); + Objects.equals(getThrowable(), that.getThrowable()) && + Objects.equals(getLocator(), that.getLocator()); } @Override @@ -97,6 +111,7 @@ public class OperationState<A extends ClusterManagementOperation<V>, V extends O result.operationEnd = this.operationEnd; result.result = this.result; result.throwable = this.throwable; + result.locator = this.locator; } return result; } diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationStateStore.java b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationStateStore.java index aea2523..d2b8c1c 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationStateStore.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationStateStore.java @@ -34,7 +34,7 @@ public interface OperationStateStore { * * @return an identifier for the state of the operation */ - <A extends ClusterManagementOperation<?>> String recordStart(A operation); + <A extends ClusterManagementOperation<?>> String recordStart(A operation, String locator); /** * Returns a single instance of an {@link OperationState} for a given diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/operation/RegionOperationStateStore.java b/geode-core/src/main/java/org/apache/geode/management/internal/operation/RegionOperationStateStore.java index 4bd2559..8bc51cc 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/operation/RegionOperationStateStore.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/operation/RegionOperationStateStore.java @@ -70,10 +70,12 @@ public class RegionOperationStateStore } @Override - public <A extends ClusterManagementOperation<?>> String recordStart(A operation) { + public <A extends ClusterManagementOperation<?>> String recordStart(A operation, String locator) { String opId = uniqueIdSupplier.get(); OperationState operationInstance = new OperationState(opId, operation, new Date()); + operationInstance.setLocator(locator); + region.put(opId, operationInstance); return opId; diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt index 2d14263..e8b3ec3 100644 --- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt +++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt @@ -456,7 +456,7 @@ org/apache/geode/management/internal/functions/CliFunctionResult$StatusState,fal org/apache/geode/management/internal/functions/GetMemberInformationFunction,true,1404642539058875565 org/apache/geode/management/internal/functions/RebalanceFunction,true,1 org/apache/geode/management/internal/functions/RestoreRedundancyFunction,true,-8991672237560920252 -org/apache/geode/management/internal/operation/OperationState,false,opId:java/lang/String,operation:org/apache/geode/management/api/ClusterManagementOperation,operationEnd:java/util/Date,operationStart:java/util/Date,result:org/apache/geode/management/runtime/OperationResult,throwable:java/lang/Throwable +org/apache/geode/management/internal/operation/OperationState,true,8212319653561969588,locator:java/lang/String,opId:java/lang/String,operation:org/apache/geode/management/api/ClusterManagementOperation,operationEnd:java/util/Date,operationStart:java/util/Date,result:org/apache/geode/management/runtime/OperationResult,throwable:java/lang/Throwable org/apache/geode/management/internal/web/domain/QueryParameterSource,true,34131123582155,objectName:javax/management/ObjectName,queryExpression:javax/management/QueryExp org/apache/geode/management/internal/web/shell/MBeanAccessException,true,813768898269516238 org/apache/geode/pdx/FieldType,false,defaultSerializedValue:java/nio/ByteBuffer,defaultValue:java/lang/Object,isFixedWidth:boolean,name:java/lang/String,width:int diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java index 5a6f543..003d442 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java @@ -51,7 +51,10 @@ import org.apache.geode.cache.configuration.CacheConfig; import org.apache.geode.cache.configuration.GatewayReceiverConfig; import org.apache.geode.cache.configuration.RegionConfig; import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.config.JAXBService; import org.apache.geode.management.api.ClusterManagementException; @@ -90,6 +93,9 @@ public class LocatorClusterManagementServiceTest { private LocatorClusterManagementService service; private InternalCache cache; + private InternalDistributedMember member; + private DistributedSystem distributedSystem; + private DistributedMember distributedMember; private InternalConfigurationPersistenceService persistenceService; private Region regionConfig; private ClusterManagementResult result; @@ -108,6 +114,14 @@ public class LocatorClusterManagementServiceTest { JAXBService.create(CacheConfig.class))); cache = mock(InternalCache.class); + member = mock(InternalDistributedMember.class); + distributedSystem = mock(InternalDistributedSystem.class); + distributedMember = mock(InternalDistributedMember.class); + when(cache.getMyId()).thenReturn(member); + when(cache.getDistributedSystem()).thenReturn(distributedSystem); + Set<DistributedMember> members = new HashSet<>(); + members.add(distributedMember); + when(distributedSystem.getAllOtherMembers()).thenReturn(members); regionValidator = mock(RegionConfigValidator.class); doCallRealMethod().when(regionValidator).validate(eq(CacheElementOperation.DELETE), any()); regionManager = spy(new RegionConfigManager(persistenceService)); @@ -389,6 +403,8 @@ public class LocatorClusterManagementServiceTest { @Test public void getRebalance() { OperationState operationState = mock(OperationState.class); + String locator = member.toString(); + when(operationState.getLocator()).thenReturn(locator); when(operationManager.get(any())).thenReturn(operationState); ClusterManagementOperationResult<RebalanceOperation, RebalanceResult> result = service.get(rebalanceOperation, "456"); @@ -418,6 +434,22 @@ public class LocatorClusterManagementServiceTest { } @Test + public void getRebalanceStatusWhenLocatorIsOffline() { + OperationState operationState = mock(OperationState.class); + when(operationManager.get(any())).thenReturn(operationState); + when(operationState.getOperationEnd()).thenReturn(new Date()); + Throwable throwable = + new RuntimeException("Locator that initiated the Rest API operation is offline:"); + when(operationState.getThrowable()).thenReturn(throwable); + + ClusterManagementOperationResult result = service.get(rebalanceOperation, "456"); + + assertThat(result.getStatusCode()).isEqualTo(ClusterManagementResult.StatusCode.ERROR); + assertThat(result.getStatusMessage()) + .contains("Locator that initiated the Rest API operation is offline:"); + } + + @Test public void getRebalanceWithOperationResultThatFailedCorrectlySetsStatusMessage() { OperationState operationState = mock(OperationState.class); when(operationManager.get(any())).thenReturn(operationState); diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationHistoryManagerTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationHistoryManagerTest.java index 1d2791a..a5b9d4a 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationHistoryManagerTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationHistoryManagerTest.java @@ -33,6 +33,9 @@ import java.util.List; import org.junit.Before; import org.junit.Test; +import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.management.api.ClusterManagementOperation; import org.apache.geode.management.runtime.OperationResult; @@ -43,7 +46,10 @@ public class OperationHistoryManagerTest { @Before public void setUp() throws Exception { operationStateStore = mock(OperationStateStore.class); - history = new OperationHistoryManager(Duration.ofHours(2), operationStateStore); + InternalCache cache = mock(InternalCache.class); + history = new OperationHistoryManager(Duration.ofHours(2), operationStateStore, cache); + when(cache.getMyId()).thenReturn(mock(InternalDistributedMember.class)); + when(cache.getDistributedSystem()).thenReturn(mock(DistributedSystem.class)); } @Test @@ -55,18 +61,20 @@ public class OperationHistoryManagerTest { public void recordStartReturnsExpectedOpId() { ClusterManagementOperation<?> op = mock(ClusterManagementOperation.class); String expectedOpId = "12345"; - when(operationStateStore.recordStart(same(op))).thenReturn(expectedOpId); + String locator = "locator"; + when(operationStateStore.recordStart(same(op), same(locator))).thenReturn(expectedOpId); - String opId = history.recordStart(op); + String opId = history.recordStart(op, locator); assertThat(opId).isSameAs(expectedOpId); } @Test public void recordStartDelegatesToPersistenceService() { ClusterManagementOperation<?> op = mock(ClusterManagementOperation.class); + String locator = "locator"; - String opId = history.recordStart(op); - verify(operationStateStore).recordStart(same(op)); + history.recordStart(op, locator); + verify(operationStateStore).recordStart(same(op), same(locator)); } @Test @@ -97,6 +105,21 @@ public class OperationHistoryManagerTest { } @Test + public void rebalanceLocatorIsOffline() { + OperationState<?, ?> operationState = new OperationState("opid", null, new Date()); + operationState.setLocator("locator"); + List<OperationState<?, ?>> ops = new ArrayList<>(); + ops.add(operationState); + doReturn(ops).when(operationStateStore).list(); + + history.expireHistory(); + + assertThat(operationState.getOperationEnd()).isNotNull(); + assertThat(operationState.getThrowable().getMessage()) + .contains("Locator that initiated the Rest API operation is offline:"); + } + + @Test public void expireHistoryRetainsUnexpiredCompletedOperations() { List<OperationState<ClusterManagementOperation<OperationResult>, OperationResult>> sampleOps = new ArrayList<>(); @@ -169,7 +192,7 @@ public class OperationHistoryManagerTest { public void recordStartCallsExpireHistory() { OperationHistoryManager historySpy = spy(history); - historySpy.recordStart(null); + historySpy.recordStart(null, null); verify(historySpy).expireHistory(); } diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationManagerTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationManagerTest.java index 9596f72..48f01dd 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationManagerTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationManagerTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.CountDownLatch; import org.junit.Before; import org.junit.Test; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.management.api.ClusterManagementOperation; import org.apache.geode.management.runtime.OperationResult; @@ -46,7 +47,8 @@ public class OperationManagerTest { public void setUp() throws Exception { operationHistoryManager = mock(OperationHistoryManager.class); cache = mock(InternalCache.class); - + InternalDistributedMember member = mock(InternalDistributedMember.class); + when(cache.getMyId()).thenReturn(member); executorManager = new OperationManager(cache, operationHistoryManager); } @@ -74,7 +76,7 @@ public class OperationManagerTest { executorManager.registerOperation( (Class<ClusterManagementOperation<OperationResult>>) operation.getClass(), performer); - when(operationHistoryManager.recordStart(operation)).thenReturn(opId); + when(operationHistoryManager.recordStart(eq(operation), any())).thenReturn(opId); when(operationHistoryManager.get(opId)).thenReturn(expectedOpState); OperationState<ClusterManagementOperation<OperationResult>, OperationResult> operationState = @@ -94,14 +96,12 @@ public class OperationManagerTest { when(performer.perform(any(), any())).thenReturn(operationResult); String opId = "my-op-id"; - when(operationHistoryManager.recordStart(any())).thenReturn(opId); + when(operationHistoryManager.recordStart(any(), any())).thenReturn(opId); executorManager.submit(operation); - await().untilAsserted(() -> { - verify(operationHistoryManager) - .recordEnd(eq(opId), same(operationResult), isNull()); - }); + await().untilAsserted(() -> verify(operationHistoryManager) + .recordEnd(eq(opId), same(operationResult), isNull())); } @Test @@ -115,14 +115,12 @@ public class OperationManagerTest { RuntimeException thrownByPerformer = new RuntimeException(); doThrow(thrownByPerformer).when(performer).perform(any(), any()); String opId = "my-op-id"; - when(operationHistoryManager.recordStart(any())).thenReturn(opId); + when(operationHistoryManager.recordStart(any(), any())).thenReturn(opId); executorManager.submit(operation); - await().untilAsserted(() -> { - verify(operationHistoryManager) - .recordEnd(eq(opId), isNull(), same(thrownByPerformer)); - }); + await().untilAsserted(() -> verify(operationHistoryManager) + .recordEnd(eq(opId), isNull(), same(thrownByPerformer))); } @Test @@ -132,7 +130,7 @@ public class OperationManagerTest { CountDownLatch performerHasTestPermissionToComplete = new CountDownLatch(1); String opId = "my-op-id"; - when(operationHistoryManager.recordStart(any())).thenReturn(opId); + when(operationHistoryManager.recordStart(any(), any())).thenReturn(opId); OperationResult operationResult = mock(OperationResult.class); @@ -158,10 +156,8 @@ public class OperationManagerTest { performerHasTestPermissionToComplete.countDown(); - await().untilAsserted(() -> { - verify(operationHistoryManager) - .recordEnd(eq(opId), same(operationResult), isNull()); - }); + await().untilAsserted(() -> verify(operationHistoryManager) + .recordEnd(eq(opId), same(operationResult), isNull())); } @Test diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationStateTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationStateTest.java index 76fe431..1de907b 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationStateTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationStateTest.java @@ -59,6 +59,14 @@ public class OperationStateTest { } @Test + public void getLocator() { + String locator = "locator"; + OperationState operationState = new OperationState(null, null, null); + operationState.setLocator(locator); + assertThat(operationState.getLocator()).isSameAs(locator); + } + + @Test public void getResult() { OperationState<?, OperationResult> operationState = new OperationState<>(null, null, null); assertThat(operationState.getResult()).isNull(); diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/operation/RegionOperationStateStoreTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/operation/RegionOperationStateStoreTest.java index a63cf00..1e7525e 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/operation/RegionOperationStateStoreTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/operation/RegionOperationStateStoreTest.java @@ -61,8 +61,9 @@ public class RegionOperationStateStoreTest { public void recordStartReturnsAnIdFromProvidedSupplier() { ClusterManagementOperation<OperationResult> operation = mock(ClusterManagementOperation.class); String uniqueId = ";lkajdfa;ldkjfppoiuqe.,.,mnavc098"; + String locator = "locator"; when(uniqueIdSupplier.get()).thenReturn(uniqueId); - String opId = service.recordStart(operation); + String opId = service.recordStart(operation, locator); assertThat(opId).isSameAs(uniqueId); verify(uniqueIdSupplier).get(); @@ -71,8 +72,9 @@ public class RegionOperationStateStoreTest { @Test public void recordStartRecordsOperationStatusInGivenRegion() { ClusterManagementOperation<OperationResult> operation = mock(ClusterManagementOperation.class); + String locator = "locator"; - String opId = service.recordStart(operation); + String opId = service.recordStart(operation, locator); ArgumentCaptor<OperationState> capturedOperationInstance = ArgumentCaptor.forClass( OperationState.class);
