This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 796d97b Revert "SAMZA-2475: Add a allocated resource expiry timeout
in samza yarn type of apps (#1296)" (#1302)
796d97b is described below
commit 796d97bc5f354580eafcc4208e5320530862ccc1
Author: rmatharu <[email protected]>
AuthorDate: Wed Mar 4 13:10:22 2020 -0800
Revert "SAMZA-2475: Add a allocated resource expiry timeout in samza yarn
type of apps (#1296)" (#1302)
This reverts commit 3a6e48cc91d3e03e8fe17ab4283182fe4d3f98a5.
---
.../clustermanager/ClusterResourceManager.java | 9 -----
.../samza/clustermanager/ContainerAllocator.java | 8 ----
.../samza/clustermanager/ContainerManager.java | 23 -----------
.../apache/samza/clustermanager/SamzaResource.java | 18 ---------
.../clustermanager/MockClusterResourceManager.java | 12 ------
.../TestContainerAllocatorWithHostAffinity.java | 45 ----------------------
.../samza/job/yarn/YarnClusterResourceManager.java | 10 -----
7 files changed, 125 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
index 8ea3c30..276bb4c 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
@@ -132,15 +132,6 @@ public abstract class ClusterResourceManager {
public abstract void stop(SamzaApplicationState.SamzaAppStatus status);
- /**
- * Checks if the allocated resource is expired. If the {@link
ClusterResourceManager} does not have a
- * concept of expired allocated resource we assume allocated resources never
expire
- * @param resource allocated resource
- * @return if the allocated resource is expired
- */
- public boolean isResourceExpired(SamzaResource resource) {
- return false;
- }
/***
* Defines a callback interface for interacting with notifications from a
ClusterResourceManager
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 2661611..2e223fc 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -261,14 +261,6 @@ public class ContainerAllocator implements Runnable {
throw new SamzaException("Expected resource for Processor ID: " +
request.getProcessorId() + " was unavailable on host: " + preferredHost);
}
- /**
- * If the allocated resource has expired then release the expired resource
and re-request the resources from {@link ClusterResourceManager}
- */
- if (clusterResourceManager.isResourceExpired(resource)) {
- containerManager.handleExpiredResource(request, resource, preferredHost,
resourceRequestState, this);
- return;
- }
-
// Update state
resourceRequestState.updateStateAfterAssignment(request, preferredHost,
resource);
String processorId = request.getProcessorId();
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 7ba97ee..3e3a060 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -305,29 +305,6 @@ public class ContainerManager {
}
/**
- * Handles expired allocated resource by requesting the same resource again
and release the expired allocated resource
- *
- * @param request pending request for the preferred host
- * @param resource resource allocated from {@link ClusterResourceManager}
which has expired
- * @param preferredHost host on which container is requested to be deployed
- * @param resourceRequestState state of request in {@link ContainerAllocator}
- * @param allocator allocator for requesting resources
- */
- void handleExpiredResource(SamzaResourceRequest request, SamzaResource
resource, String preferredHost,
- ResourceRequestState resourceRequestState, ContainerAllocator allocator)
{
- LOG.info("Allocated resource {} has expired for Processor ID: {} request:
{}. Re-requesting resource again",
- resource, request.getProcessorId(), request);
- resourceRequestState.releaseUnstartableContainer(resource, preferredHost);
- resourceRequestState.cancelResourceRequest(request);
- SamzaResourceRequest newResourceRequest =
allocator.getResourceRequest(request.getProcessorId(),
request.getPreferredHost());
- if
(hasActiveContainerPlacementAction(newResourceRequest.getProcessorId())) {
- ContainerPlacementMetadata metadata =
getPlacementActionMetadata(request.getProcessorId()).get();
- metadata.recordResourceRequest(newResourceRequest);
- }
- allocator.issueResourceRequest(newResourceRequest);
- }
-
- /**
* Registers a container placement action to move the running container to
destination host, if destination host is same as the
* host on which container is running, container placement action is treated
as a restart.
*
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
index 30c0902..4d0bf91 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
@@ -19,9 +19,6 @@
package org.apache.samza.clustermanager;
-import com.google.common.annotations.VisibleForTesting;
-
-
/**
* Specification of a Samza Resource. A resource is identified by a unique
resource ID.
* A resource is currently comprised of CPUs and Memory resources on a host.
@@ -31,7 +28,6 @@ public class SamzaResource {
private final int memoryMb;
private final String host;
private final String containerId;
- private final long timestamp;
//TODO: Investigate adding disk space. Mesos supports disk based
reservations.
@@ -40,16 +36,6 @@ public class SamzaResource {
this.memoryMb = memoryMb;
this.host = host;
this.containerId = containerId;
- this.timestamp = System.currentTimeMillis();
- }
-
- @VisibleForTesting
- SamzaResource(int numCores, int memoryMb, String host, String containerId,
long timestamp) {
- this.numCores = numCores;
- this.memoryMb = memoryMb;
- this.host = host;
- this.containerId = containerId;
- this.timestamp = timestamp;
}
@Override
@@ -96,8 +82,4 @@ public class SamzaResource {
public String getContainerId() {
return containerId;
}
-
- public long getTimestamp() {
- return timestamp;
- }
}
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
index e4be156..d50ce59 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
@@ -20,7 +20,6 @@
package org.apache.samza.clustermanager;
import com.google.common.collect.ImmutableList;
-import java.time.Duration;
import org.apache.samza.job.CommandBuilder;
import org.junit.Assert;
@@ -109,13 +108,6 @@ public class MockClusterResourceManager extends
ClusterResourceManager {
clusterManagerCallback.onResourcesCompleted(statList);
}
- @Override
- public boolean isResourceExpired(SamzaResource resource) {
- Duration yarnAllocatedResourceExpiry =
Duration.ofMinutes(10).minus(Duration.ofSeconds(30));
- return System.currentTimeMillis() - resource.getTimestamp() >
yarnAllocatedResourceExpiry.toMillis();
- }
-
-
public void registerContainerListener(MockContainerListener listener) {
mockContainerListeners.add(listener);
}
@@ -124,10 +116,6 @@ public class MockClusterResourceManager extends
ClusterResourceManager {
mockContainerListeners.clear();
}
- public boolean containsReleasedResource(SamzaResource resource) {
- return releasedResources.contains(resource);
- }
-
@Override
public void stop(SamzaApplicationState.SamzaAppStatus status) {
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 593ddb9..bf2eabf 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -511,51 +511,6 @@ public class TestContainerAllocatorWithHostAffinity {
containerAllocator.stop();
}
- @Test(timeout = 5000)
- public void testExpiredAllocatedResourcesAreReleased() throws Exception {
- ClusterResourceManager.Callback mockCPM =
mock(MockClusterResourceManagerCallback.class);
- MockClusterResourceManager mockClusterResourceManager = new
MockClusterResourceManager(mockCPM, state);
- ContainerManager spyContainerManager =
- spy(new ContainerManager(containerPlacementMetadataStore, state,
mockClusterResourceManager, true, false));
-
- SamzaResource expiredAllocatedResource = new SamzaResource(1, 1000,
"host-0", "id0",
- System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
- spyAllocator = Mockito.spy(
- new ContainerAllocator(mockClusterResourceManager, config, state,
true, spyContainerManager));
- spyAllocator.addResource(expiredAllocatedResource);
- spyAllocator.addResource(new SamzaResource(1, 1000, "host-1", "1d1"));
-
- // Request Preferred Resources
- spyAllocator.requestResources(new HashMap<String, String>() {
- {
- put("0", "host-0");
- put("1", "host-1");
- }
- });
-
- spyAllocatorThread = new Thread(spyAllocator);
- // Start the container allocator thread periodic assignment
- spyAllocatorThread.start();
-
- // Wait until allocated resource is expired
- while (state.preferredHostRequests.get() != 3) {
- Thread.sleep(100);
- }
-
- // Verify that handleExpiredResource was invoked once for expired
allocated resource
- ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor =
ArgumentCaptor.forClass(SamzaResourceRequest.class);
- ArgumentCaptor<SamzaResource> resourceArgumentCaptor =
ArgumentCaptor.forClass(SamzaResource.class);
- verify(spyContainerManager,
times(1)).handleExpiredResource(resourceRequestCaptor.capture(),
- resourceArgumentCaptor.capture(), eq("host-0"), any(), any());
- resourceRequestCaptor.getAllValues()
- .forEach(resourceRequest ->
assertEquals(resourceRequest.getProcessorId(), "0"));
- resourceArgumentCaptor.getAllValues()
- .forEach(resource -> assertEquals(resource.getHost(), "host-0"));
- // Verify resources were released
-
assertTrue(mockClusterResourceManager.containsReleasedResource(expiredAllocatedResource));
- containerAllocator.stop();
- }
-
//@Test
public void testExpiryWithNonResponsiveClusterManager() throws Exception {
diff --git
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 8d23e04..43c49cc 100644
---
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -19,7 +19,6 @@
package org.apache.samza.job.yarn;
-import java.time.Duration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -576,15 +575,6 @@ public class YarnClusterResourceManager extends
ClusterResourceManager implement
}
}
- @Override
- public boolean isResourceExpired(SamzaResource resource) {
- // Time from which resource was allocated > Yarn Expiry Timeout - 30 sec
(to account for clock skew)
- Duration yarnAllocatedResourceExpiry =
-
Duration.ofMinutes(YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS)
- .minus(Duration.ofSeconds(30));
- return System.currentTimeMillis() - resource.getTimestamp() >
yarnAllocatedResourceExpiry.toMillis();
- }
-
/**
* Runs a process as specified by the command builder on the container.
* @param processorId id of the samza processor to run (passed as a command
line parameter to the process)