>From Ali Alsuliman <[email protected]>:
Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21279?usp=email )
Change subject: [ASTERIXDB-3649][HYR][API] Sweep only completed jobs +
RESULT_TTL change
......................................................................
[ASTERIXDB-3649][HYR][API] Sweep only completed jobs + RESULT_TTL change
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- when a request (having a linked job id) is processed,
remove the job record and the request for ones that
are done.
- when a request is discarded, allow discarding jobs
having a terminal state like "failed", i.e. only disallow
discarding in-progress jobs.
- change default RESULT_TTL to 1 hour instead of 24 hours.
Ext-ref: MB-71997
Change-Id: Ide6d7df9e522fe96e45808a2fd6b7eb802477093
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/AbstractResultManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
14 files changed, 88 insertions(+), 58 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/79/21279/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b7e195b..8e165c0 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -373,7 +373,7 @@
public void compileAndExecute(IHyracksClientConnection hcc,
IRequestParameters requestParameters) throws Exception {
validateStatements(requestParameters);
boolean trackInAsyncDeferredRequests =
shouldTrackAsSeparateRequest(requestParameters);
- trackRequest(requestParameters, trackInAsyncDeferredRequests);
+ IClientRequest clientRequest = trackRequest(requestParameters,
trackInAsyncDeferredRequests);
Counter resultSetIdCounter = new Counter(0);
FileSplit outputFile = null;
String threadName = Thread.currentThread().getName();
@@ -613,34 +613,30 @@
if (statements.isEmpty() || ResultDelivery.ASYNC !=
resultDelivery) {
// this assumes 1-1 mapping between a request and a statement,
needs to be adapted for multi-statement
appCtx.getRequestTracker().complete(reqId);
- if (ResultDelivery.ASYNC != resultDelivery) {
- completeDeferred(reqId, hasResultSet, exception);
+ if (ResultDelivery.ASYNC != resultDelivery &&
trackInAsyncDeferredRequests) {
+ completeDeferred((ClientRequest) clientRequest,
hasResultSet, exception);
}
}
Thread.currentThread().setName(threadName);
}
}
- private void completeDeferred(String reqId, boolean hasResultSet,
Exception exception) {
+ private void completeDeferred(ClientRequest clientRequest, boolean
hasResultSet, Exception exception) {
try {
- Optional<IClientRequest> optClientReq =
appCtx.getRequestTracker().getAsyncOrDeferredRequest(reqId);
- if (optClientReq.isPresent()) {
- ClientRequest clientRequest = (ClientRequest)
optClientReq.get();
- JobId jobId = clientRequest.getJobId();
- if (jobId == null) {
- // jobId = null either means:
- // 1. compile error, compile-only, ... resulting in no job
created whether query, DML or DDL
- // 2. statement currently not setting jobId in the client
request, e.g. DDLs
- // for 2. it needs to be handled so that the job record is
removed also if not producing a result
-
appCtx.getRequestTracker().removeAsyncOrDeferredRequest(reqId);
- } else if (!hasResultSet) {
- // don't sweep ones that completed successfully and
produced a result
- // sweeps statements not producing a result, e.g. DMLs
without a return clause
- // sweeps statements that threw an exception whether
query, DML or DDL
- ClusterControllerService ccSvs =
- (ClusterControllerService)
appCtx.getServiceContext().getControllerService();
- ccSvs.getResultDirectoryService().sweep(jobId);
- }
+ JobId jobId = clientRequest.getJobId();
+ if (jobId == null) {
+ // jobId = null either means:
+ // 1. compile error, compile-only, ... resulting in no job
created whether query, DML or DDL
+ // 2. statement currently not setting jobId in the client
request, e.g. DDLs
+ // for 2. it needs to be handled so that the job record is
removed also if not producing a result
+
appCtx.getRequestTracker().removeAsyncOrDeferredRequest(clientRequest.getId());
+ } else if (!hasResultSet) {
+ // don't sweep ones that completed successfully and produced a
result
+ // sweeps statements not producing a result, e.g. DMLs without
a return clause
+ // sweeps statements that threw an exception whether query,
DML or DDL
+ ClusterControllerService ccSvs =
+ (ClusterControllerService)
appCtx.getServiceContext().getControllerService();
+ ccSvs.getResultDirectoryService().removeIfDone(jobId);
}
} catch (Throwable th) {
if (exception != null) {
@@ -6046,13 +6042,14 @@
}
}
- protected void trackRequest(IRequestParameters requestParameters, boolean
trackInAsyncDeferredRequests)
+ protected IClientRequest trackRequest(IRequestParameters
requestParameters, boolean trackInAsyncDeferredRequests)
throws HyracksDataException {
final IClientRequest clientRequest =
appCtx.getReceptionist().requestReceived(requestParameters);
this.appCtx.getRequestTracker().track(clientRequest);
if (trackInAsyncDeferredRequests) {
appCtx.getRequestTracker().trackAsyncOrDeferredRequest(clientRequest);
}
+ return clientRequest;
}
/**
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
index b92ec57..bce9f7b 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
@@ -39,10 +39,10 @@
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.ResultDirectoryRecord;
import org.apache.hyracks.api.result.ResultJobRecord;
import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.api.result.ResultSetMetaData;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.result.IResultDirectoryService;
import org.apache.hyracks.control.common.work.IResultCallback;
@@ -74,20 +74,25 @@
((ClusterControllerService)
appCtx.getServiceContext().getControllerService())
.getResultDirectoryService();
// Check if result is in a valid state for discarding
- ResultJobRecord.Status status =
resultDirectoryService.getResultStatus(jobId, resultSetId);
- if (status.getState() != ResultJobRecord.State.SUCCESS) {
- LOGGER.log(Level.WARN, "Cannot discard result for job {}, result
set {}, request {} - status is {}", jobId,
- resultSetId, requestId, status);
+ ResultJobRecord jobRecord = resultDirectoryService.getJobRecord(jobId);
+ if (jobRecord == null) {
+ LOGGER.warn("Job record not found for job {}, request {}. Removing
request tracking info", jobId,
+ requestId);
+ removeRequest(appCtx, requestId);
return;
}
- IResultMetadata resultMetadata =
resultDirectoryService.getResultMetadata(jobId, resultSetId);
- if (resultMetadata == null) {
+ if (!jobRecord.isDone()) {
+ LOGGER.warn("Cannot discard result for job {}, result set {},
request {} - status is {}", jobId,
+ resultSetId, requestId, jobRecord.getStatus().getState());
+ return;
+ }
+ ResultSetMetaData resultSetMetaData = jobRecord.getResultSetMetaData();
+ if (resultSetMetaData == null || resultSetMetaData.getMetadata() ==
null) {
LOGGER.debug(
- "Result metadata not found for job {}, result set {},
request id {}. Removing async req tracking info",
+ "Result metadata not found for job {}, result set {},
request {}. Removing request tracking info and job record",
jobId, resultSetId, requestId);
- if (requestId != null) {
-
appCtx.getRequestTracker().removeAsyncOrDeferredRequest(requestId);
- }
+ resultDirectoryService.sweep(jobId);
+ removeRequest(appCtx, requestId);
return;
}
@@ -105,6 +110,10 @@
// Clean up result directory and request tracking
resultDirectoryService.sweep(jobId);
+ removeRequest(appCtx, requestId);
+ }
+
+ private static void removeRequest(ICcApplicationContext appCtx, String
requestId) {
if (requestId != null) {
appCtx.getRequestTracker().removeAsyncOrDeferredRequest(requestId);
}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 54b4004..bd0d4dc 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -229,7 +229,7 @@
ccConfig.setClientListenAddress(Inet4Address.getLoopbackAddress().getHostAddress());
ccConfig.setClientListenPort(DEFAULT_HYRACKS_CC_CLIENT_PORT);
ccConfig.setClusterListenPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT);
- ccConfig.setResultTTL(RESULT_TTL);
+ ccConfig.setResultTTLMillis(RESULT_TTL);
ccConfig.setResultSweepThreshold(1000L);
ccConfig.setEnforceFrameWriterProtocol(true);
configManager.set(ControllerConfig.Option.DEFAULT_DIR,
joinPath(getDefaultStoragePath(), "asterixdb"));
@@ -258,7 +258,7 @@
ncConfig.setDataListenAddress(Inet4Address.getLoopbackAddress().getHostAddress());
ncConfig.setResultListenAddress(Inet4Address.getLoopbackAddress().getHostAddress());
ncConfig.setMessagingListenAddress(Inet4Address.getLoopbackAddress().getHostAddress());
- ncConfig.setResultTTL(RESULT_TTL);
+ ncConfig.setResultTTLMillis(RESULT_TTL);
ncConfig.setResultSweepThreshold(1000L);
ncConfig.setVirtualNC();
configManager.set(ControllerConfig.Option.DEFAULT_DIR,
joinPath(getDefaultStoragePath(), "asterixdb", ncName));
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
index 752ff4f..4baa9ea 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
@@ -23,7 +23,7 @@
/**
* Returns the timestamp in nanoseconds when the record is created.
*/
- long getTimestamp();
+ long getTimestampNanos();
/**
* Returns the per-request result TTL in nanoseconds, or -1 if system
default should be used.
@@ -33,6 +33,6 @@
/**
* Returns the timestamp in nanoseconds when the record is completed, or 0
if not completed yet.
*/
- long getCompleteTimestamp();
+ long getCompleteTimestampNanos();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
index eae1ba0..a9a0492 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
@@ -156,13 +156,17 @@
updateState(State.TIMEOUT);
}
+ public boolean isDone() {
+ return status.state != State.RUNNING && status.state != State.IDLE;
+ }
+
@Override
- public long getTimestamp() {
+ public long getTimestampNanos() {
return timestamp;
}
@Override
- public long getCompleteTimestamp() {
+ public long getCompleteTimestampNanos() {
return jobEndTime;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index a386ab8..f7ea382 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -181,7 +181,7 @@
ccContext = new ClusterControllerContext(topology);
sweeper = new DeadNodeSweeper();
resultDirectoryService =
- new ResultDirectoryService(ccConfig.getResultTTL(),
ccConfig.getResultSweepThreshold());
+ new ResultDirectoryService(ccConfig.getResultTTLMillis(),
ccConfig.getResultSweepThreshold());
deploymentRunMap = new HashMap<>();
stateDumpRunMap = new HashMap<>();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
index 0366774..c2e49c6 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.result.IResultManager;
import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.ResultDirectoryRecord;
+import org.apache.hyracks.api.result.ResultJobRecord;
import org.apache.hyracks.api.result.ResultJobRecord.Status;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.control.common.work.IResultCallback;
@@ -55,4 +56,9 @@
public void getResultPartitionLocations(JobId jobId, ResultSetId rsId,
ResultDirectoryRecord[] knownLocations,
IResultCallback<ResultDirectoryRecord[]> callback) throws
HyracksDataException;
+
+ public void removeIfDone(JobId jobId);
+
+ public ResultJobRecord getJobRecord(JobId jobId);
+
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index d9d446f..40643d4 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -66,8 +66,8 @@
private final Map<JobId, JobResultInfo> jobResultLocations;
private IJobResultCallback jobResultCallback;
- public ResultDirectoryService(long resultTTL, long resultSweepThreshold) {
- super(resultTTL);
+ public ResultDirectoryService(long resultTTLMillis, long
resultSweepThreshold) {
+ super(resultTTLMillis);
this.resultSweepThreshold = resultSweepThreshold;
jobResultLocations = new LinkedHashMap<>();
}
@@ -102,6 +102,7 @@
}
@Override
+ //TODO: shouldn't this also be synchronized?
public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus
jobStatus, List<Exception> exceptions)
throws HyracksException {
ResultJobRecord resultJobRecord = getResultJobRecord(jobId);
@@ -228,6 +229,14 @@
}
@Override
+ public synchronized void removeIfDone(JobId jobId) {
+ ResultJobRecord resultJobRecord = getResultJobRecord(jobId);
+ if (resultJobRecord != null && resultJobRecord.isDone()) {
+ sweep(jobId);
+ }
+ }
+
+ @Override
public synchronized Set<JobId> getJobIds() {
return jobResultLocations.keySet();
}
@@ -238,6 +247,11 @@
}
@Override
+ public synchronized ResultJobRecord getJobRecord(JobId jobId) {
+ return getResultJobRecord(jobId);
+ }
+
+ @Override
public void sweep(JobId jobId) {
JobResultInfo removedJob = sweepJob(jobId);
if (removedJob != null) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index 18825da..3c85e2f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -66,7 +66,7 @@
DEAD_NODE_SWEEP_THRESHOLD(LONG, HEARTBEAT_PERIOD),
PROFILE_DUMP_PERIOD(NONNEGATIVE_INTEGER, 0),
JOB_HISTORY_SIZE(NONNEGATIVE_INTEGER, 10),
- RESULT_TTL(LONG, 86400000L), // TODO(mblow): add time unit
+ RESULT_TTL(LONG, 3600000L), // TODO(mblow): add time unit
RESULT_SWEEP_THRESHOLD(LONG, 60000L), // TODO(mblow): add time unit
@SuppressWarnings("RedundantCast") // not redundant- false positive
from IDEA
ROOT_DIR(STRING, (Function<IApplicationConfig, String>) appConfig ->
FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR),
"ClusterControllerService"), "<value of " +
ControllerConfig.Option.DEFAULT_DIR.cmdline() + ">/ClusterControllerService"),
@@ -388,12 +388,12 @@
configManager.set(Option.JOB_HISTORY_SIZE, jobHistorySize);
}
- public long getResultTTL() {
+ public long getResultTTLMillis() {
return getAppConfig().getLong(Option.RESULT_TTL);
}
- public void setResultTTL(long resultTTL) {
- configManager.set(Option.RESULT_TTL, resultTTL);
+ public void setResultTTLMillis(long resultTTLMillis) {
+ configManager.set(Option.RESULT_TTL, resultTTLMillis);
}
public long getResultSweepThreshold() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index f55dd59..50e9e42 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -82,7 +82,7 @@
"<value of " + ControllerConfig.Option.DEFAULT_DIR.cmdline() +
">/iodevice"),
NET_THREAD_COUNT(POSITIVE_INTEGER, 1),
NET_BUFFER_COUNT(POSITIVE_INTEGER, 1),
- RESULT_TTL(LONG, 86400000L),
+ RESULT_TTL(LONG, 3600000L),
RESULT_SWEEP_THRESHOLD(LONG, 60000L),
RESULT_MANAGER_MEMORY(INTEGER_BYTE_UNIT, -1),
@SuppressWarnings("RedundantCast") // not redundant- false positive
from IDEA
@@ -568,12 +568,12 @@
configManager.set(nodeId, Option.NET_BUFFER_COUNT, netBufferCount);
}
- public long getResultTTL() {
+ public long getResultTTLMillis() {
return appConfig.getLong(Option.RESULT_TTL);
}
- public void setResultTTL(long resultTTL) {
- configManager.set(nodeId, Option.RESULT_TTL, resultTTL);
+ public void setResultTTLMillis(long resultTTLMillis) {
+ configManager.set(nodeId, Option.RESULT_TTL, resultTTLMillis);
}
public long getResultSweepThreshold() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/AbstractResultManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/AbstractResultManager.java
index 46e6f2d..1aff09f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/AbstractResultManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/AbstractResultManager.java
@@ -50,7 +50,7 @@
}
private boolean hasExpired(IResultStateRecord state, long currentTime) {
- long completeTimestamp = state.getCompleteTimestamp();
+ long completeTimestamp = state.getCompleteTimestampNanos();
if (completeTimestamp <= 0) {
// Not completed yet, not expired
return false;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index f3df6a8..a0c2185 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -265,7 +265,7 @@
private void init() {
resultPartitionManager = new ResultPartitionManager(this, executor,
ncConfig.getResultManagerMemory(),
- ncConfig.getResultTTL(), ncConfig.getResultSweepThreshold());
+ ncConfig.getResultTTLMillis(),
ncConfig.getResultSweepThreshold());
resultNetworkManager = new
ResultNetworkManager(ncConfig.getResultListenAddress(),
ncConfig.getResultListenPort(), resultPartitionManager,
ncConfig.getNetThreadCount(),
ncConfig.getNetBufferCount(),
ncConfig.getResultPublicAddress(), ncConfig.getResultPublicPort(),
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
index 963def0..b13988a 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
@@ -55,9 +55,9 @@
private final ResultMemoryManager resultMemoryManager;
- public ResultPartitionManager(NodeControllerService ncs, Executor
executor, int availableMemory, long resultTTL,
- long resultSweepThreshold) {
- super(resultTTL);
+ public ResultPartitionManager(NodeControllerService ncs, Executor
executor, int availableMemory,
+ long resultTTLMillis, long resultSweepThreshold) {
+ super(resultTTLMillis);
this.ncs = ncs;
this.executor = executor;
deallocatableRegistry = new DefaultDeallocatableRegistry();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
index bb73f84..35b69e4 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
@@ -45,12 +45,12 @@
}
@Override
- public long getTimestamp() {
+ public long getTimestampNanos() {
return timestamp;
}
@Override
- public long getCompleteTimestamp() {
+ public long getCompleteTimestampNanos() {
return completeTimestamp;
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21279?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: Ide6d7df9e522fe96e45808a2fd6b7eb802477093
Gerrit-Change-Number: 21279
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>