This is an automated email from the ASF dual-hosted git repository.

jasonhuynh pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 4721b164501bba82309041522172276ed3042f4a
Author: Jianxia Chen <[email protected]>
AuthorDate: Fri Jul 10 14:42:01 2020 -0700

    GEODE-8200: Rebalance operations stuck in "IN_PROGRESS" state forever 
(#5350)
    
    Record the locator that issues the original Rest API request. If the 
locator is offline afterwards, report an error.
    
    Co-authored-by: Jianxia Chen <[email protected]>
    Co-authored-by: Jinmei Liao <[email protected]>
    (cherry picked from commit 426b9de66ae9adadde8f1994b2340fc9de809d81)
---
 .../management/OperationManagementUpgradeTest.java | 123 +++++++++++++++++++++
 .../api/LocatorClusterManagementService.java       |   3 +-
 .../operation/OperationHistoryManager.java         |  35 +++++-
 .../internal/operation/OperationManager.java       |   3 +-
 .../internal/operation/OperationState.java         |  17 ++-
 .../internal/operation/OperationStateStore.java    |   2 +-
 .../operation/RegionOperationStateStore.java       |   4 +-
 .../sanctioned-geode-core-serializables.txt        |   2 +-
 .../api/LocatorClusterManagementServiceTest.java   |  32 ++++++
 .../operation/OperationHistoryManagerTest.java     |  35 +++++-
 .../internal/operation/OperationManagerTest.java   |  30 +++--
 .../internal/operation/OperationStateTest.java     |   8 ++
 .../operation/RegionOperationStateStoreTest.java   |   6 +-
 13 files changed, 262 insertions(+), 38 deletions(-)

diff --git 
a/geode-assembly/src/upgradeTest/java/org/apache/geode/management/OperationManagementUpgradeTest.java
 
b/geode-assembly/src/upgradeTest/java/org/apache/geode/management/OperationManagementUpgradeTest.java
new file mode 100644
index 0000000..6121a50
--- /dev/null
+++ 
b/geode-assembly/src/upgradeTest/java/org/apache/geode/management/OperationManagementUpgradeTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management;
+
+import static org.apache.geode.test.dunit.Host.getHost;
+import static 
org.apache.geode.test.junit.rules.gfsh.GfshRule.startLocatorCommand;
+import static 
org.apache.geode.test.junit.rules.gfsh.GfshRule.startServerCommand;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.management.api.ClusterManagementOperationResult;
+import org.apache.geode.management.api.ClusterManagementResult;
+import org.apache.geode.management.api.ClusterManagementService;
+import org.apache.geode.management.client.ClusterManagementServiceBuilder;
+import org.apache.geode.management.operation.RebalanceOperation;
+import org.apache.geode.management.runtime.RebalanceResult;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.internal.DUnitLauncher;
+import org.apache.geode.test.junit.categories.BackwardCompatibilityTest;
+import org.apache.geode.test.junit.rules.gfsh.GfshExecution;
+import org.apache.geode.test.junit.rules.gfsh.GfshRule;
+import org.apache.geode.test.junit.rules.gfsh.GfshScript;
+import 
org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
+import org.apache.geode.test.version.TestVersion;
+import org.apache.geode.test.version.VersionManager;
+
+@Category({BackwardCompatibilityTest.class})
+@RunWith(Parameterized.class)
[email protected](CategoryWithParameterizedRunnerFactory.class)
+public class OperationManagementUpgradeTest {
+  private final String oldVersion;
+  private VM vm;
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<String> data() {
+    List<String> result = 
VersionManager.getInstance().getVersionsWithoutCurrent();
+    result.removeIf(s -> TestVersion.compare(s, "1.13.0") < 0);
+    return result;
+  }
+
+  public OperationManagementUpgradeTest(String version) {
+    oldVersion = version;
+    oldGfsh = new GfshRule(oldVersion);
+    DUnitLauncher.launchIfNeeded(false);
+    // get the vm with the same version of the oldGfsh
+    vm = getHost(0).getVM(oldVersion, 0);
+  }
+
+  @Rule
+  public GfshRule oldGfsh;
+
+  @Rule
+  public GfshRule gfsh = new GfshRule();
+
+  @Test
+  public void newLocatorCanReadOldConfigurationData() {
+    int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(7);
+    int locatorPort1 = ports[0];
+    int jmxPort1 = ports[1];
+    int httpPort1 = ports[2];
+    int locatorPort2 = ports[3];
+    int jmxPort2 = ports[4];
+    int httpPort2 = ports[5];
+    int serverPort = ports[6];
+    GfshExecution execute =
+        GfshScript.of(startLocatorCommand("locator1", locatorPort1, jmxPort1, 
httpPort1, 0))
+            .and(startLocatorCommand("locator2", locatorPort2, jmxPort2, 
httpPort2, locatorPort1))
+            .and(startServerCommand("server", serverPort, locatorPort1))
+            .execute(oldGfsh);
+
+    String operationId = vm.invoke(() -> {
+      // start a cms client that connects to locator1's http port
+      ClusterManagementService cms = new ClusterManagementServiceBuilder()
+          .setHost("localhost")
+          .setPort(httpPort1)
+          .build();
+
+      ClusterManagementOperationResult<RebalanceOperation, RebalanceResult> 
startResult =
+          cms.start(new RebalanceOperation());
+      assertThat(startResult.getStatusCode())
+          .isEqualTo(ClusterManagementResult.StatusCode.ACCEPTED);
+      return startResult.getOperationId();
+    });
+
+    // stop locator1
+    oldGfsh.stopLocator(execute, "locator1");
+    // use new gfsh to start locator1, make sure new locator can start
+    GfshScript.of(startLocatorCommand("locator1", locatorPort1, jmxPort1, 
httpPort1, locatorPort2))
+        .execute(gfsh, execute.getWorkingDir());
+
+    // use the new cms client
+    ClusterManagementService cms = new ClusterManagementServiceBuilder()
+        .setHost("localhost")
+        .setPort(httpPort1)
+        .build();
+    ClusterManagementOperationResult<RebalanceOperation, RebalanceResult> 
operationResult =
+        cms.get(new RebalanceOperation(), operationId);
+    System.out.println(operationResult);
+    
assertThat(operationResult.getStatusCode()).isEqualTo(ClusterManagementResult.StatusCode.OK);
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java
index f20bc36..5256359 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java
@@ -122,7 +122,7 @@ public class LocatorClusterManagementService implements 
ClusterManagementService
     this(cache, persistenceService, new ConcurrentHashMap<>(), new 
ConcurrentHashMap<>(),
         new MemberValidator(cache, persistenceService), new 
CommonConfigurationValidator(),
         new OperationManager(cache,
-            new OperationHistoryManager(new 
RegionOperationStateStore(cache))));
+            new OperationHistoryManager(new RegionOperationStateStore(cache), 
cache)));
     // initialize the list of managers
     managers.put(Region.class, new RegionConfigManager(persistenceService));
     managers.put(Pdx.class, new PdxManager(persistenceService));
@@ -484,6 +484,7 @@ public class LocatorClusterManagementService implements 
ClusterManagementService
     if (operationState == null) {
       raise(StatusCode.ENTITY_NOT_FOUND, "Operation '" + opId + "' does not 
exist.");
     }
+
     return toClusterManagementOperationResult(operationState);
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationHistoryManager.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationHistoryManager.java
index 5f09252..5873ee2 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationHistoryManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationHistoryManager.java
@@ -22,6 +22,7 @@ import java.util.stream.Collectors;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.management.api.ClusterManagementOperation;
 import org.apache.geode.management.runtime.OperationResult;
 
@@ -35,22 +36,25 @@ import org.apache.geode.management.runtime.OperationResult;
 public class OperationHistoryManager {
   private final long keepCompletedMillis;
   private final OperationStateStore operationStateStore;
+  private final InternalCache cache;
 
   /**
-   * set a default retention policy to keep results for 2 hours after 
completion
+   * set a default retention policy to keep results for 7 days after completion
    */
   public OperationHistoryManager(
-      OperationStateStore operationStateStore) {
-    this(Duration.ofDays(7), operationStateStore);
+      OperationStateStore operationStateStore, InternalCache cache) {
+    this(Duration.ofDays(7), operationStateStore, cache);
   }
 
   /**
    * set a custom retention policy to keep results for X amount of time after 
completion
    */
   public OperationHistoryManager(Duration keepCompleted,
-      OperationStateStore operationStateStore) {
+      OperationStateStore operationStateStore,
+      InternalCache cache) {
     keepCompletedMillis = keepCompleted.toMillis();
     this.operationStateStore = operationStateStore;
+    this.cache = cache;
   }
 
   /**
@@ -69,6 +73,7 @@ public class OperationHistoryManager {
     final long expirationTime = now() - keepCompletedMillis;
     Set<String> expiredKeys = operationStateStore.list()
         .stream()
+        .map(this::validateLocator)
         .filter(operationInstance -> isExpired(expirationTime, 
operationInstance))
         .map(OperationState::getId)
         .collect(Collectors.toSet());
@@ -90,13 +95,31 @@ public class OperationHistoryManager {
     return operationEnd.getTime() <= expirationTime;
   }
 
+  private OperationState<?, ?> validateLocator(OperationState<?, ?> 
operationState) {
+    if (isLocatorOffline(operationState)) {
+      operationState.setOperationEnd(new Date(), null,
+          new RuntimeException("Locator that initiated the Rest API operation 
is offline: "
+              + operationState.getLocator()));
+    }
+
+    return operationState;
+  }
+
+  private boolean isLocatorOffline(OperationState operationState) {
+    return operationState.getOperationEnd() == null
+        && (operationState.getLocator() != null)
+        && !cache.getMyId().toString().equals(operationState.getLocator())
+        && 
(!cache.getDistributedSystem().getAllOtherMembers().stream().map(Object::toString)
+            
.collect(Collectors.toSet()).contains(operationState.getLocator()));
+  }
+
   /**
    * Stores a new operation in the history and returns its unique identifier.
    */
-  public String recordStart(ClusterManagementOperation<?> op) {
+  public String recordStart(ClusterManagementOperation<?> op, String locator) {
     expireHistory();
 
-    return operationStateStore.recordStart(op);
+    return operationStateStore.recordStart(op, locator);
   }
 
   /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationManager.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationManager.java
index c5c7027..6dd1260 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationManager.java
@@ -63,12 +63,11 @@ public class OperationManager implements AutoCloseable {
           op.getClass().getSimpleName()));
     }
 
-    String opId = historyManager.recordStart(op);
+    String opId = historyManager.recordStart(op, cache.getMyId().toString());
     // get the operationState BEFORE we start the async thread
     // so that start will return a result that is not influenced
     // by how far the async thread gets in its execution.
     OperationState<A, V> operationState = historyManager.get(opId);
-
     CompletableFuture.supplyAsync(() -> performer.perform(cache, op), executor)
         .whenComplete((result, exception) -> {
           Throwable cause = exception == null ? null : exception.getCause();
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationState.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationState.java
index a7cb8ba..4c944b5 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationState.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationState.java
@@ -28,12 +28,25 @@ import org.apache.geode.management.runtime.OperationResult;
  */
 public class OperationState<A extends ClusterManagementOperation<V>, V extends 
OperationResult>
     implements Identifiable<String> {
+  private static final long serialVersionUID = 8212319653561969588L;
   private final String opId;
   private final A operation;
   private final Date operationStart;
   private Date operationEnd;
   private V result;
   private Throwable throwable;
+  private String locator;
+
+  public String getLocator() {
+    return this.locator;
+  }
+
+  public void setLocator(
+      String locator) {
+    synchronized (this) {
+      this.locator = locator;
+    }
+  }
 
   @Override
   public boolean equals(Object o) {
@@ -49,7 +62,8 @@ public class OperationState<A extends 
ClusterManagementOperation<V>, V extends O
         Objects.equals(getOperationStart(), that.getOperationStart()) &&
         Objects.equals(getOperationEnd(), that.getOperationEnd()) &&
         Objects.equals(getResult(), that.getResult()) &&
-        Objects.equals(getThrowable(), that.getThrowable());
+        Objects.equals(getThrowable(), that.getThrowable()) &&
+        Objects.equals(getLocator(), that.getLocator());
   }
 
   @Override
@@ -97,6 +111,7 @@ public class OperationState<A extends 
ClusterManagementOperation<V>, V extends O
       result.operationEnd = this.operationEnd;
       result.result = this.result;
       result.throwable = this.throwable;
+      result.locator = this.locator;
     }
     return result;
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationStateStore.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationStateStore.java
index aea2523..d2b8c1c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationStateStore.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/operation/OperationStateStore.java
@@ -34,7 +34,7 @@ public interface OperationStateStore {
    *
    * @return an identifier for the state of the operation
    */
-  <A extends ClusterManagementOperation<?>> String recordStart(A operation);
+  <A extends ClusterManagementOperation<?>> String recordStart(A operation, 
String locator);
 
   /**
    * Returns a single instance of an {@link OperationState} for a given
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/operation/RegionOperationStateStore.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/operation/RegionOperationStateStore.java
index 4bd2559..8bc51cc 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/operation/RegionOperationStateStore.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/operation/RegionOperationStateStore.java
@@ -70,10 +70,12 @@ public class RegionOperationStateStore
   }
 
   @Override
-  public <A extends ClusterManagementOperation<?>> String recordStart(A 
operation) {
+  public <A extends ClusterManagementOperation<?>> String recordStart(A 
operation, String locator) {
     String opId = uniqueIdSupplier.get();
 
     OperationState operationInstance = new OperationState(opId, operation, new 
Date());
+    operationInstance.setLocator(locator);
+
     region.put(opId, operationInstance);
 
     return opId;
diff --git 
a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
 
b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index 2d14263..e8b3ec3 100644
--- 
a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++ 
b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -456,7 +456,7 @@ 
org/apache/geode/management/internal/functions/CliFunctionResult$StatusState,fal
 
org/apache/geode/management/internal/functions/GetMemberInformationFunction,true,1404642539058875565
 org/apache/geode/management/internal/functions/RebalanceFunction,true,1
 
org/apache/geode/management/internal/functions/RestoreRedundancyFunction,true,-8991672237560920252
-org/apache/geode/management/internal/operation/OperationState,false,opId:java/lang/String,operation:org/apache/geode/management/api/ClusterManagementOperation,operationEnd:java/util/Date,operationStart:java/util/Date,result:org/apache/geode/management/runtime/OperationResult,throwable:java/lang/Throwable
+org/apache/geode/management/internal/operation/OperationState,true,8212319653561969588,locator:java/lang/String,opId:java/lang/String,operation:org/apache/geode/management/api/ClusterManagementOperation,operationEnd:java/util/Date,operationStart:java/util/Date,result:org/apache/geode/management/runtime/OperationResult,throwable:java/lang/Throwable
 
org/apache/geode/management/internal/web/domain/QueryParameterSource,true,34131123582155,objectName:javax/management/ObjectName,queryExpression:javax/management/QueryExp
 
org/apache/geode/management/internal/web/shell/MBeanAccessException,true,813768898269516238
 
org/apache/geode/pdx/FieldType,false,defaultSerializedValue:java/nio/ByteBuffer,defaultValue:java/lang/Object,isFixedWidth:boolean,name:java/lang/String,width:int
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java
index 5a6f543..003d442 100644
--- 
a/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java
@@ -51,7 +51,10 @@ import org.apache.geode.cache.configuration.CacheConfig;
 import org.apache.geode.cache.configuration.GatewayReceiverConfig;
 import org.apache.geode.cache.configuration.RegionConfig;
 import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.DistributedSystem;
 import 
org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.config.JAXBService;
 import org.apache.geode.management.api.ClusterManagementException;
@@ -90,6 +93,9 @@ public class LocatorClusterManagementServiceTest {
 
   private LocatorClusterManagementService service;
   private InternalCache cache;
+  private InternalDistributedMember member;
+  private DistributedSystem distributedSystem;
+  private DistributedMember distributedMember;
   private InternalConfigurationPersistenceService persistenceService;
   private Region regionConfig;
   private ClusterManagementResult result;
@@ -108,6 +114,14 @@ public class LocatorClusterManagementServiceTest {
         JAXBService.create(CacheConfig.class)));
 
     cache = mock(InternalCache.class);
+    member = mock(InternalDistributedMember.class);
+    distributedSystem = mock(InternalDistributedSystem.class);
+    distributedMember = mock(InternalDistributedMember.class);
+    when(cache.getMyId()).thenReturn(member);
+    when(cache.getDistributedSystem()).thenReturn(distributedSystem);
+    Set<DistributedMember> members = new HashSet<>();
+    members.add(distributedMember);
+    when(distributedSystem.getAllOtherMembers()).thenReturn(members);
     regionValidator = mock(RegionConfigValidator.class);
     
doCallRealMethod().when(regionValidator).validate(eq(CacheElementOperation.DELETE),
 any());
     regionManager = spy(new RegionConfigManager(persistenceService));
@@ -389,6 +403,8 @@ public class LocatorClusterManagementServiceTest {
   @Test
   public void getRebalance() {
     OperationState operationState = mock(OperationState.class);
+    String locator = member.toString();
+    when(operationState.getLocator()).thenReturn(locator);
     when(operationManager.get(any())).thenReturn(operationState);
     ClusterManagementOperationResult<RebalanceOperation, RebalanceResult> 
result =
         service.get(rebalanceOperation, "456");
@@ -418,6 +434,22 @@ public class LocatorClusterManagementServiceTest {
   }
 
   @Test
+  public void getRebalanceStatusWhenLocatorIsOffline() {
+    OperationState operationState = mock(OperationState.class);
+    when(operationManager.get(any())).thenReturn(operationState);
+    when(operationState.getOperationEnd()).thenReturn(new Date());
+    Throwable throwable =
+        new RuntimeException("Locator that initiated the Rest API operation is 
offline:");
+    when(operationState.getThrowable()).thenReturn(throwable);
+
+    ClusterManagementOperationResult result = service.get(rebalanceOperation, 
"456");
+
+    
assertThat(result.getStatusCode()).isEqualTo(ClusterManagementResult.StatusCode.ERROR);
+    assertThat(result.getStatusMessage())
+        .contains("Locator that initiated the Rest API operation is offline:");
+  }
+
+  @Test
   public void 
getRebalanceWithOperationResultThatFailedCorrectlySetsStatusMessage() {
     OperationState operationState = mock(OperationState.class);
     when(operationManager.get(any())).thenReturn(operationState);
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationHistoryManagerTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationHistoryManagerTest.java
index 1d2791a..a5b9d4a 100644
--- 
a/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationHistoryManagerTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationHistoryManagerTest.java
@@ -33,6 +33,9 @@ import java.util.List;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.geode.distributed.DistributedSystem;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.management.api.ClusterManagementOperation;
 import org.apache.geode.management.runtime.OperationResult;
 
@@ -43,7 +46,10 @@ public class OperationHistoryManagerTest {
   @Before
   public void setUp() throws Exception {
     operationStateStore = mock(OperationStateStore.class);
-    history = new OperationHistoryManager(Duration.ofHours(2), 
operationStateStore);
+    InternalCache cache = mock(InternalCache.class);
+    history = new OperationHistoryManager(Duration.ofHours(2), 
operationStateStore, cache);
+    when(cache.getMyId()).thenReturn(mock(InternalDistributedMember.class));
+    
when(cache.getDistributedSystem()).thenReturn(mock(DistributedSystem.class));
   }
 
   @Test
@@ -55,18 +61,20 @@ public class OperationHistoryManagerTest {
   public void recordStartReturnsExpectedOpId() {
     ClusterManagementOperation<?> op = mock(ClusterManagementOperation.class);
     String expectedOpId = "12345";
-    when(operationStateStore.recordStart(same(op))).thenReturn(expectedOpId);
+    String locator = "locator";
+    when(operationStateStore.recordStart(same(op), 
same(locator))).thenReturn(expectedOpId);
 
-    String opId = history.recordStart(op);
+    String opId = history.recordStart(op, locator);
     assertThat(opId).isSameAs(expectedOpId);
   }
 
   @Test
   public void recordStartDelegatesToPersistenceService() {
     ClusterManagementOperation<?> op = mock(ClusterManagementOperation.class);
+    String locator = "locator";
 
-    String opId = history.recordStart(op);
-    verify(operationStateStore).recordStart(same(op));
+    history.recordStart(op, locator);
+    verify(operationStateStore).recordStart(same(op), same(locator));
   }
 
   @Test
@@ -97,6 +105,21 @@ public class OperationHistoryManagerTest {
   }
 
   @Test
+  public void rebalanceLocatorIsOffline() {
+    OperationState<?, ?> operationState = new OperationState("opid", null, new 
Date());
+    operationState.setLocator("locator");
+    List<OperationState<?, ?>> ops = new ArrayList<>();
+    ops.add(operationState);
+    doReturn(ops).when(operationStateStore).list();
+
+    history.expireHistory();
+
+    assertThat(operationState.getOperationEnd()).isNotNull();
+    assertThat(operationState.getThrowable().getMessage())
+        .contains("Locator that initiated the Rest API operation is offline:");
+  }
+
+  @Test
   public void expireHistoryRetainsUnexpiredCompletedOperations() {
     List<OperationState<ClusterManagementOperation<OperationResult>, 
OperationResult>> sampleOps =
         new ArrayList<>();
@@ -169,7 +192,7 @@ public class OperationHistoryManagerTest {
   public void recordStartCallsExpireHistory() {
     OperationHistoryManager historySpy = spy(history);
 
-    historySpy.recordStart(null);
+    historySpy.recordStart(null, null);
     verify(historySpy).expireHistory();
   }
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationManagerTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationManagerTest.java
index 9596f72..48f01dd 100644
--- 
a/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationManagerTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationManagerTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CountDownLatch;
 import org.junit.Before;
 import org.junit.Test;
 
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.management.api.ClusterManagementOperation;
 import org.apache.geode.management.runtime.OperationResult;
@@ -46,7 +47,8 @@ public class OperationManagerTest {
   public void setUp() throws Exception {
     operationHistoryManager = mock(OperationHistoryManager.class);
     cache = mock(InternalCache.class);
-
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    when(cache.getMyId()).thenReturn(member);
     executorManager = new OperationManager(cache, operationHistoryManager);
   }
 
@@ -74,7 +76,7 @@ public class OperationManagerTest {
     executorManager.registerOperation(
         (Class<ClusterManagementOperation<OperationResult>>) 
operation.getClass(), performer);
 
-    when(operationHistoryManager.recordStart(operation)).thenReturn(opId);
+    when(operationHistoryManager.recordStart(eq(operation), 
any())).thenReturn(opId);
     when(operationHistoryManager.get(opId)).thenReturn(expectedOpState);
 
     OperationState<ClusterManagementOperation<OperationResult>, 
OperationResult> operationState =
@@ -94,14 +96,12 @@ public class OperationManagerTest {
 
     when(performer.perform(any(), any())).thenReturn(operationResult);
     String opId = "my-op-id";
-    when(operationHistoryManager.recordStart(any())).thenReturn(opId);
+    when(operationHistoryManager.recordStart(any(), any())).thenReturn(opId);
 
     executorManager.submit(operation);
 
-    await().untilAsserted(() -> {
-      verify(operationHistoryManager)
-          .recordEnd(eq(opId), same(operationResult), isNull());
-    });
+    await().untilAsserted(() -> verify(operationHistoryManager)
+        .recordEnd(eq(opId), same(operationResult), isNull()));
   }
 
   @Test
@@ -115,14 +115,12 @@ public class OperationManagerTest {
     RuntimeException thrownByPerformer = new RuntimeException();
     doThrow(thrownByPerformer).when(performer).perform(any(), any());
     String opId = "my-op-id";
-    when(operationHistoryManager.recordStart(any())).thenReturn(opId);
+    when(operationHistoryManager.recordStart(any(), any())).thenReturn(opId);
 
     executorManager.submit(operation);
 
-    await().untilAsserted(() -> {
-      verify(operationHistoryManager)
-          .recordEnd(eq(opId), isNull(), same(thrownByPerformer));
-    });
+    await().untilAsserted(() -> verify(operationHistoryManager)
+        .recordEnd(eq(opId), isNull(), same(thrownByPerformer)));
   }
 
   @Test
@@ -132,7 +130,7 @@ public class OperationManagerTest {
     CountDownLatch performerHasTestPermissionToComplete = new 
CountDownLatch(1);
 
     String opId = "my-op-id";
-    when(operationHistoryManager.recordStart(any())).thenReturn(opId);
+    when(operationHistoryManager.recordStart(any(), any())).thenReturn(opId);
 
     OperationResult operationResult = mock(OperationResult.class);
 
@@ -158,10 +156,8 @@ public class OperationManagerTest {
 
     performerHasTestPermissionToComplete.countDown();
 
-    await().untilAsserted(() -> {
-      verify(operationHistoryManager)
-          .recordEnd(eq(opId), same(operationResult), isNull());
-    });
+    await().untilAsserted(() -> verify(operationHistoryManager)
+        .recordEnd(eq(opId), same(operationResult), isNull()));
   }
 
   @Test
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationStateTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationStateTest.java
index 76fe431..1de907b 100644
--- 
a/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationStateTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationStateTest.java
@@ -59,6 +59,14 @@ public class OperationStateTest {
   }
 
   @Test
+  public void getLocator() {
+    String locator = "locator";
+    OperationState operationState = new OperationState(null, null, null);
+    operationState.setLocator(locator);
+    assertThat(operationState.getLocator()).isSameAs(locator);
+  }
+
+  @Test
   public void getResult() {
     OperationState<?, OperationResult> operationState = new 
OperationState<>(null, null, null);
     assertThat(operationState.getResult()).isNull();
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/internal/operation/RegionOperationStateStoreTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/internal/operation/RegionOperationStateStoreTest.java
index a63cf00..1e7525e 100644
--- 
a/geode-core/src/test/java/org/apache/geode/management/internal/operation/RegionOperationStateStoreTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/management/internal/operation/RegionOperationStateStoreTest.java
@@ -61,8 +61,9 @@ public class RegionOperationStateStoreTest {
   public void recordStartReturnsAnIdFromProvidedSupplier() {
     ClusterManagementOperation<OperationResult> operation = 
mock(ClusterManagementOperation.class);
     String uniqueId = ";lkajdfa;ldkjfppoiuqe.,.,mnavc098";
+    String locator = "locator";
     when(uniqueIdSupplier.get()).thenReturn(uniqueId);
-    String opId = service.recordStart(operation);
+    String opId = service.recordStart(operation, locator);
 
     assertThat(opId).isSameAs(uniqueId);
     verify(uniqueIdSupplier).get();
@@ -71,8 +72,9 @@ public class RegionOperationStateStoreTest {
   @Test
   public void recordStartRecordsOperationStatusInGivenRegion() {
     ClusterManagementOperation<OperationResult> operation = 
mock(ClusterManagementOperation.class);
+    String locator = "locator";
 
-    String opId = service.recordStart(operation);
+    String opId = service.recordStart(operation, locator);
 
     ArgumentCaptor<OperationState> capturedOperationInstance = 
ArgumentCaptor.forClass(
         OperationState.class);

Reply via email to