This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b836e86889 Fix job submission time for reload and foce commit job
(#11803)
b836e86889 is described below
commit b836e868895b55d8fbb01d5f95c1164ac458f4be
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Oct 16 11:47:00 2023 -0700
Fix job submission time for reload and foce commit job (#11803)
---
.../api/resources/PinotRealtimeTableResource.java | 8 ++++----
.../api/resources/PinotSegmentRestletResource.java | 5 ++++-
.../helix/core/PinotHelixResourceManager.java | 17 ++++++++++-------
3 files changed, 18 insertions(+), 12 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index 1c0083494f..404e7553db 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.controller.api.resources;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
@@ -139,8 +138,8 @@ public class PinotRealtimeTableResource {
+ "Please note that this is an asynchronous operation, "
+ "and 200 response does not mean it has actually been done already")
public Map<String, String> forceCommit(
- @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName)
- throws JsonProcessingException {
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName) {
+ long startTimeMs = System.currentTimeMillis();
String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
validate(tableNameWithType);
Map<String, String> response = new HashMap<>();
@@ -149,7 +148,8 @@ public class PinotRealtimeTableResource {
response.put("forceCommitStatus", "SUCCESS");
try {
String jobId = UUID.randomUUID().toString();
- _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType,
jobId, consumingSegmentsForceCommitted);
+ _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType,
jobId, startTimeMs,
+ consumingSegmentsForceCommitted);
response.put("jobMetaZKWriteStatus", "SUCCESS");
response.put("forceCommitJobId", jobId);
} catch (Exception e) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 5db7d8c1c3..33d4b65b5d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -480,6 +480,7 @@ public class PinotSegmentRestletResource {
@ApiParam(value = "Name of the segment", required = true)
@PathParam("segmentName") @Encoded String segmentName,
@ApiParam(value = "Whether to force server to download segment")
@QueryParam("forceDownload")
@DefaultValue("false") boolean forceDownload) {
+ long startTimeMs = System.currentTimeMillis();
segmentName = URIUtils.decode(segmentName);
String tableNameWithType = getExistingTable(tableName, segmentName);
Pair<Integer, String> msgInfo =
@@ -488,6 +489,7 @@ public class PinotSegmentRestletResource {
if (msgInfo.getLeft() > 0) {
try {
if
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType,
segmentName, msgInfo.getRight(),
+ startTimeMs,
msgInfo.getLeft())) {
zkJobMetaWriteSuccess = true;
} else {
@@ -749,6 +751,7 @@ public class PinotSegmentRestletResource {
@ApiParam(value = "Whether to force server to download segment")
@QueryParam("forceDownload")
@DefaultValue("false") boolean forceDownload)
throws JsonProcessingException {
+ long startTimeMs = System.currentTimeMillis();
TableType tableTypeFromTableName =
TableNameBuilder.getTableTypeFromTableName(tableName);
TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
// When rawTableName is provided but w/o table type, Pinot tries to reload
both OFFLINE
@@ -771,7 +774,7 @@ public class PinotSegmentRestletResource {
perTableMsgData.put(tableNameWithType, tableReloadMeta);
// Store in ZK
try {
- if
(_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType,
msgInfo.getRight(),
+ if
(_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType,
msgInfo.getRight(), startTimeMs,
msgInfo.getLeft())) {
tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
} else {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 81dec9a34e..c3fde8a85d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2028,32 +2028,33 @@ public class PinotHelixResourceManager {
* @param tableNameWithType Table for which job is to be added
* @param segmentName Name of the segment being reloaded
* @param jobId job's UUID
+ * @param jobSubmissionTimeMs time at which the job was submitted
* @param numMessagesSent number of messages that were sent to servers.
Saved as metadata
* @return boolean representing success / failure of the ZK write step
*/
public boolean addNewReloadSegmentJob(String tableNameWithType, String
segmentName, String jobId,
- int numMessagesSent) {
+ long jobSubmissionTimeMs, int numMessagesSent) {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.RELOAD_SEGMENT.toString());
- jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
+ jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT,
Integer.toString(numMessagesSent));
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME,
segmentName);
return addControllerJobToZK(jobId, jobMetadata,
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.RELOAD_SEGMENT));
}
- public boolean addNewForceCommitJob(String tableNameWithType, String jobId,
Set<String> consumingSegmentsCommitted)
+ public boolean addNewForceCommitJob(String tableNameWithType, String jobId,
long jobSubmissionTimeMs,
+ Set<String> consumingSegmentsCommitted)
throws JsonProcessingException {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.FORCE_COMMIT.toString());
- jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
+ jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
JsonUtils.objectToString(consumingSegmentsCommitted));
-
return addControllerJobToZK(jobId, jobMetadata,
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.FORCE_COMMIT));
}
@@ -2062,15 +2063,17 @@ public class PinotHelixResourceManager {
* Adds a new reload segment job metadata into ZK
* @param tableNameWithType Table for which job is to be added
* @param jobId job's UUID
+ * @param jobSubmissionTimeMs time at which the job was submitted
* @param numberOfMessagesSent number of messages that were sent to servers.
Saved as metadata
* @return boolean representing success / failure of the ZK write step
*/
- public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String
jobId, int numberOfMessagesSent) {
+ public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String
jobId, long jobSubmissionTimeMs,
+ int numberOfMessagesSent) {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.RELOAD_SEGMENT.toString());
- jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
+ jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT,
Integer.toString(numberOfMessagesSent));
return addControllerJobToZK(jobId, jobMetadata,
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.RELOAD_SEGMENT));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]