Jackie-Jiang commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r984958874
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -20,7 +20,6 @@
import java.io.File;
-
Review Comment:
(format) shouldn't remove
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
return result;
}
+ @POST
+ @Path("table/{tableName}/timeBoundary")
Review Comment:
```suggestion
@Path("tables/{tableName}/timeBoundary")
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
return result;
}
+ @POST
+ @Path("table/{tableName}/timeBoundary")
+ @ApiOperation(value = "Set hybrid table query time boundary based on offline
segments' metadata",
+ notes = "Set hybrid table query time boundary based on offline segments'
metadata")
+ public SuccessResponse setEnforcedQueryTimeBoundary(
+ @ApiParam(value = "Name of the hybrid table (without type suffix)",
required = true) @PathParam("tableName")
+ String tableName)
+ throws Exception {
+ // Validate its a hybrid table
+ if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) ||
!_pinotHelixResourceManager.hasOfflineTable(
+ tableName)) {
+ throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid
table", Response.Status.BAD_REQUEST);
+ }
+
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+ // Call all servers to validate offline table state
+ Map<String, List<String>> serverToSegments =
_pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+ BiMap<String, String> serverEndPoints =
+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+ CompletionServiceHelper completionServiceHelper =
+ new CompletionServiceHelper(_executor, _connectionManager,
serverEndPoints);
+ List<String> serverUrls = new ArrayList<>();
+ BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+ for (String endpoint : endpointsToServers.keySet()) {
+ String reloadTaskStatusEndpoint = endpoint + "/tables/" +
offlineTableName + "/allSegmentsLoaded";
+ serverUrls.add(reloadTaskStatusEndpoint);
+ }
+
+ CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ completionServiceHelper.doMultiGetRequest(serverUrls, null, true,
10000);
+
+ if (serviceResponse._failedResponseCount > 0) {
+ throw new ControllerApplicationException(LOGGER, "Could not validate
table segment status",
+ Response.Status.SERVICE_UNAVAILABLE);
+ }
+
+ Long timeBoundary = null;
+ // Validate all responses
+ for (String response : serviceResponse._httpResponses.values()) {
+ TableSegmentValidationInfo tableSegmentValidationInfo =
+ JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+ if (!tableSegmentValidationInfo.isValid()) {
+ throw new ControllerApplicationException(LOGGER, "Table segment
validation failed",
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ timeBoundary = Math.max((timeBoundary == null) ? -1 : timeBoundary,
tableSegmentValidationInfo.getMaxTimestamp());
+ }
+
+ if (timeBoundary == null) {
+ throw new ControllerApplicationException(LOGGER, "Could not validate
table segment status",
+ Response.Status.SERVICE_UNAVAILABLE);
+ }
+
+ final String timeBoundaryFinal = String.valueOf(timeBoundary);
+
+ // Set the timeBoundary in tableIdealState
+ IdealState idealState =
+
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(),
offlineTableName, is -> {
+ assert is != null;
+
is.getRecord().setSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY,
timeBoundaryFinal);
+ return is;
+ }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
+
+ if (idealState == null) {
+ throw new ControllerApplicationException(LOGGER, "Could not update time
boundary",
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+
+ return new SuccessResponse("Time boundary updated successfully to " +
timeBoundaryFinal);
+ }
+
+ @DELETE
+ @Path("table/{tableName}/timeBoundary")
+ @ApiOperation(value = "Delete hybrid table query time boundary", notes =
"Delete hybrid table query time boundary")
+ public SuccessResponse deleteEnforcedQueryTimeBoundary(
+ @ApiParam(value = "Name of the hybrid table (without type suffix)",
required = true) @PathParam("tableName")
+ String tableName)
+ throws Exception {
+ // Validate its a hybrid table
+ if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) ||
!_pinotHelixResourceManager.hasOfflineTable(
Review Comment:
We probably don't want to check the realtime table. I can see a corner case
where user wants to remove the realtime table and keep only the offline table.
We can check the offline table ideal state exist, and has the time boundary
set
##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -514,4 +526,59 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
}
return segmentConsumerInfoList;
}
+
+ @GET
+ @Path("tables/{tableNameWithType}/allSegmentsLoaded")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Validates if the ideal state matches with the
segmentstate on this server", notes =
+ "Validates if the ideal state matches with the segmentstate on this
server")
+ public TableSegmentValidationInfo validateTableSegmentState(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableNameWithType")
+ String tableNameWithType) {
+ // Get table current ideal state
+ IdealState tableIdealState =
HelixHelper.getTableIdealState(_serverInstance.getHelixManager(),
tableNameWithType);
+ TableDataManager tableDataManager =
+ ServerResourceUtils.checkGetTableDataManager(_serverInstance,
tableNameWithType);
+
+ // Validate segments in idealstate which belong to this server
+ long maxEndTime = -1;
+ Map<String, Map<String, String>> instanceStatesMap =
tableIdealState.getRecord().getMapFields();
+ for (Map.Entry<String, Map<String, String>> kv :
instanceStatesMap.entrySet()) {
+ String segmentName = kv.getKey();
+ if (kv.getValue().containsKey(_instanceId)) {
+ // Segment hosted by this server. Validate segment state
+ SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(segmentName);
+ try {
+ String segmentState = kv.getValue().get(_instanceId);
+
+ switch (segmentState) {
+ case CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING:
+ // Only validate presence of segment
+ if (segmentDataManager == null) {
+ return new TableSegmentValidationInfo(false, -1);
+ }
+ break;
+ case CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE:
+ // Validate segment CRC
+ SegmentZKMetadata zkMetadata =
+
ZKMetadataProvider.getSegmentZKMetadata(_serverInstance.getHelixManager().getHelixPropertyStore(),
+ tableNameWithType, segmentName);
Review Comment:
Consider throwing proper exception when `zkMetadata` is `null`, or it will
get NPE here
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
return result;
}
+ @POST
+ @Path("table/{tableName}/timeBoundary")
+ @ApiOperation(value = "Set hybrid table query time boundary based on offline
segments' metadata",
+ notes = "Set hybrid table query time boundary based on offline segments'
metadata")
+ public SuccessResponse setEnforcedQueryTimeBoundary(
+ @ApiParam(value = "Name of the hybrid table (without type suffix)",
required = true) @PathParam("tableName")
+ String tableName)
+ throws Exception {
+ // Validate its a hybrid table
+ if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) ||
!_pinotHelixResourceManager.hasOfflineTable(
+ tableName)) {
+ throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid
table", Response.Status.BAD_REQUEST);
+ }
+
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+ // Call all servers to validate offline table state
+ Map<String, List<String>> serverToSegments =
_pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
Review Comment:
Let's create a helper method for validating the segment state. We may add a
separate API for it which can be useful for debugging
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
return result;
}
+ @POST
+ @Path("table/{tableName}/timeBoundary")
+ @ApiOperation(value = "Set hybrid table query time boundary based on offline
segments' metadata",
+ notes = "Set hybrid table query time boundary based on offline segments'
metadata")
+ public SuccessResponse setEnforcedQueryTimeBoundary(
+ @ApiParam(value = "Name of the hybrid table (without type suffix)",
required = true) @PathParam("tableName")
+ String tableName)
+ throws Exception {
+ // Validate its a hybrid table
+ if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) ||
!_pinotHelixResourceManager.hasOfflineTable(
+ tableName)) {
+ throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid
table", Response.Status.BAD_REQUEST);
+ }
+
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+ // Call all servers to validate offline table state
+ Map<String, List<String>> serverToSegments =
_pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+ BiMap<String, String> serverEndPoints =
+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+ CompletionServiceHelper completionServiceHelper =
+ new CompletionServiceHelper(_executor, _connectionManager,
serverEndPoints);
+ List<String> serverUrls = new ArrayList<>();
+ BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+ for (String endpoint : endpointsToServers.keySet()) {
+ String reloadTaskStatusEndpoint = endpoint + "/tables/" +
offlineTableName + "/allSegmentsLoaded";
+ serverUrls.add(reloadTaskStatusEndpoint);
+ }
+
+ CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ completionServiceHelper.doMultiGetRequest(serverUrls, null, true,
10000);
+
+ if (serviceResponse._failedResponseCount > 0) {
+ throw new ControllerApplicationException(LOGGER, "Could not validate
table segment status",
+ Response.Status.SERVICE_UNAVAILABLE);
+ }
+
+ Long timeBoundary = null;
+ // Validate all responses
+ for (String response : serviceResponse._httpResponses.values()) {
+ TableSegmentValidationInfo tableSegmentValidationInfo =
+ JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+ if (!tableSegmentValidationInfo.isValid()) {
+ throw new ControllerApplicationException(LOGGER, "Table segment
validation failed",
+ Response.Status.INTERNAL_SERVER_ERROR);
Review Comment:
We should pick a different status
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long
extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
return endTimeMs;
}
- private void updateTimeBoundaryInfo(long maxEndTimeMs) {
- if (maxEndTimeMs > 0) {
- String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs -
_timeOffsetMs);
- TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
- if (currentTimeBoundaryInfo == null ||
!currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
- _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
- LOGGER.info("Updated time boundary to: {} for table: {}",
timeBoundary, _offlineTableName);
+ private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary,
long maxEndTimeMs,
+ boolean idealStateReffered) {
+ TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
+ Long finalTimeBoundaryMs = null;
+ boolean isEnforced = false;
+ boolean validTimeBoundaryFound = false;
+
+ if (enforcedTimeBoundary != null) {
Review Comment:
This part is quite complicated. It will be good if we can simplify the
logic, or put some comments explaining the logic under each if branch
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long
extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
return endTimeMs;
}
- private void updateTimeBoundaryInfo(long maxEndTimeMs) {
- if (maxEndTimeMs > 0) {
- String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs -
_timeOffsetMs);
- TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
- if (currentTimeBoundaryInfo == null ||
!currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
- _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
- LOGGER.info("Updated time boundary to: {} for table: {}",
timeBoundary, _offlineTableName);
+ private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary,
long maxEndTimeMs,
Review Comment:
(minor) Consider following the same convention of using -1 to represent the
invalid value
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long
extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
return endTimeMs;
}
- private void updateTimeBoundaryInfo(long maxEndTimeMs) {
- if (maxEndTimeMs > 0) {
- String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs -
_timeOffsetMs);
- TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
- if (currentTimeBoundaryInfo == null ||
!currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
- _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
- LOGGER.info("Updated time boundary to: {} for table: {}",
timeBoundary, _offlineTableName);
+ private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary,
long maxEndTimeMs,
+ boolean idealStateReffered) {
+ TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
+ Long finalTimeBoundaryMs = null;
+ boolean isEnforced = false;
+ boolean validTimeBoundaryFound = false;
+
+ if (enforcedTimeBoundary != null) {
+ finalTimeBoundaryMs = enforcedTimeBoundary;
+ isEnforced = true;
+ validTimeBoundaryFound = true;
+ LOGGER.info("Enforced table time boundary in use: {} for table: {}",
enforcedTimeBoundary, _offlineTableName);
+ } else if (idealStateReffered || !currentTimeBoundaryInfo.isEnforced()) {
+ if (maxEndTimeMs > 0) {
+ finalTimeBoundaryMs = maxEndTimeMs - _timeOffsetMs;
+ validTimeBoundaryFound = true;
+ } else {
+ LOGGER.warn("Failed to find segment with valid end time for table: {},
no time boundary generated",
+ _offlineTableName);
+ }
+ } else {
+ validTimeBoundaryFound = true;
+ LOGGER.info("Skipping time boundary update since enforced time boundary
exists");
+ }
+
+ if (validTimeBoundaryFound) {
+ if (finalTimeBoundaryMs != null) {
+ String timeBoundary =
_timeFormatSpec.fromMillisToFormat(finalTimeBoundaryMs);
+ if (currentTimeBoundaryInfo == null ||
!currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
Review Comment:
Here we also want to check if the explicitly set flag is the same
##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -514,4 +526,59 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
}
return segmentConsumerInfoList;
}
+
+ @GET
+ @Path("tables/{tableNameWithType}/allSegmentsLoaded")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Validates if the ideal state matches with the
segmentstate on this server", notes =
+ "Validates if the ideal state matches with the segmentstate on this
server")
Review Comment:
(minor)
```suggestion
@ApiOperation(value = "Validates if the ideal state matches with the
segment state on this server", notes =
"Validates if the ideal state matches with the segment state on this
server")
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
return result;
}
+ @POST
+ @Path("table/{tableName}/timeBoundary")
+ @ApiOperation(value = "Set hybrid table query time boundary based on offline
segments' metadata",
+ notes = "Set hybrid table query time boundary based on offline segments'
metadata")
+ public SuccessResponse setEnforcedQueryTimeBoundary(
+ @ApiParam(value = "Name of the hybrid table (without type suffix)",
required = true) @PathParam("tableName")
+ String tableName)
+ throws Exception {
+ // Validate its a hybrid table
+ if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) ||
!_pinotHelixResourceManager.hasOfflineTable(
+ tableName)) {
+ throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid
table", Response.Status.BAD_REQUEST);
+ }
+
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+ // Call all servers to validate offline table state
+ Map<String, List<String>> serverToSegments =
_pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+ BiMap<String, String> serverEndPoints =
+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+ CompletionServiceHelper completionServiceHelper =
+ new CompletionServiceHelper(_executor, _connectionManager,
serverEndPoints);
+ List<String> serverUrls = new ArrayList<>();
+ BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+ for (String endpoint : endpointsToServers.keySet()) {
+ String reloadTaskStatusEndpoint = endpoint + "/tables/" +
offlineTableName + "/allSegmentsLoaded";
+ serverUrls.add(reloadTaskStatusEndpoint);
+ }
+
+ CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ completionServiceHelper.doMultiGetRequest(serverUrls, null, true,
10000);
+
+ if (serviceResponse._failedResponseCount > 0) {
+ throw new ControllerApplicationException(LOGGER, "Could not validate
table segment status",
+ Response.Status.SERVICE_UNAVAILABLE);
+ }
+
+ Long timeBoundary = null;
+ // Validate all responses
+ for (String response : serviceResponse._httpResponses.values()) {
+ TableSegmentValidationInfo tableSegmentValidationInfo =
+ JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+ if (!tableSegmentValidationInfo.isValid()) {
+ throw new ControllerApplicationException(LOGGER, "Table segment
validation failed",
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ timeBoundary = Math.max((timeBoundary == null) ? -1 : timeBoundary,
tableSegmentValidationInfo.getMaxTimestamp());
+ }
+
+ if (timeBoundary == null) {
+ throw new ControllerApplicationException(LOGGER, "Could not validate
table segment status",
+ Response.Status.SERVICE_UNAVAILABLE);
+ }
+
+ final String timeBoundaryFinal = String.valueOf(timeBoundary);
+
+ // Set the timeBoundary in tableIdealState
+ IdealState idealState =
+
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(),
offlineTableName, is -> {
+ assert is != null;
+
is.getRecord().setSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY,
timeBoundaryFinal);
+ return is;
+ }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
+
+ if (idealState == null) {
+ throw new ControllerApplicationException(LOGGER, "Could not update time
boundary",
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+
+ return new SuccessResponse("Time boundary updated successfully to " +
timeBoundaryFinal);
+ }
+
+ @DELETE
+ @Path("table/{tableName}/timeBoundary")
+ @ApiOperation(value = "Delete hybrid table query time boundary", notes =
"Delete hybrid table query time boundary")
+ public SuccessResponse deleteEnforcedQueryTimeBoundary(
+ @ApiParam(value = "Name of the hybrid table (without type suffix)",
required = true) @PathParam("tableName")
+ String tableName)
+ throws Exception {
+ // Validate its a hybrid table
+ if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) ||
!_pinotHelixResourceManager.hasOfflineTable(
+ tableName)) {
+ throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid
table", Response.Status.BAD_REQUEST);
+ }
+
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+ // Delete the timeBoundary in tableIdealState
+ IdealState idealState =
+
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(),
offlineTableName, is -> {
+ assert is != null;
Review Comment:
We cannot make this assert because the input is can be `null`, and we should
throw exception in that case.
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -822,4 +821,8 @@ public static class Range {
public static final String UPPER_UNBOUNDED = DELIMITER + UNBOUNDED +
UPPER_EXCLUSIVE;
}
}
+
+ public static class IdealState {
+ public static final String QUERY_TIME_BOUNDARY =
"HYBRID_TABLE_TIME_BOUNDARY";
Review Comment:
```suggestion
public static final String HYBRID_TABLE_TIME_BOUNDARY =
"HYBRID_TABLE_TIME_BOUNDARY";
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
return result;
}
+ @POST
+ @Path("table/{tableName}/timeBoundary")
+ @ApiOperation(value = "Set hybrid table query time boundary based on offline
segments' metadata",
+ notes = "Set hybrid table query time boundary based on offline segments'
metadata")
+ public SuccessResponse setEnforcedQueryTimeBoundary(
+ @ApiParam(value = "Name of the hybrid table (without type suffix)",
required = true) @PathParam("tableName")
+ String tableName)
+ throws Exception {
+ // Validate its a hybrid table
+ if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) ||
!_pinotHelixResourceManager.hasOfflineTable(
+ tableName)) {
+ throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid
table", Response.Status.BAD_REQUEST);
+ }
+
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+ // Call all servers to validate offline table state
+ Map<String, List<String>> serverToSegments =
_pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+ BiMap<String, String> serverEndPoints =
+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+ CompletionServiceHelper completionServiceHelper =
+ new CompletionServiceHelper(_executor, _connectionManager,
serverEndPoints);
+ List<String> serverUrls = new ArrayList<>();
+ BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+ for (String endpoint : endpointsToServers.keySet()) {
+ String reloadTaskStatusEndpoint = endpoint + "/tables/" +
offlineTableName + "/allSegmentsLoaded";
+ serverUrls.add(reloadTaskStatusEndpoint);
+ }
+
+ CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ completionServiceHelper.doMultiGetRequest(serverUrls, null, true,
10000);
+
+ if (serviceResponse._failedResponseCount > 0) {
+ throw new ControllerApplicationException(LOGGER, "Could not validate
table segment status",
+ Response.Status.SERVICE_UNAVAILABLE);
+ }
+
+ Long timeBoundary = null;
+ // Validate all responses
+ for (String response : serviceResponse._httpResponses.values()) {
+ TableSegmentValidationInfo tableSegmentValidationInfo =
+ JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+ if (!tableSegmentValidationInfo.isValid()) {
+ throw new ControllerApplicationException(LOGGER, "Table segment
validation failed",
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ timeBoundary = Math.max((timeBoundary == null) ? -1 : timeBoundary,
tableSegmentValidationInfo.getMaxTimestamp());
+ }
+
+ if (timeBoundary == null) {
+ throw new ControllerApplicationException(LOGGER, "Could not validate
table segment status",
+ Response.Status.SERVICE_UNAVAILABLE);
+ }
+
+ final String timeBoundaryFinal = String.valueOf(timeBoundary);
+
+ // Set the timeBoundary in tableIdealState
+ IdealState idealState =
+
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(),
offlineTableName, is -> {
+ assert is != null;
Review Comment:
We cannot make this assert because the input is can be `null`, and we should
throw exception in that case.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
return result;
}
+ @POST
+ @Path("table/{tableName}/timeBoundary")
+ @ApiOperation(value = "Set hybrid table query time boundary based on offline
segments' metadata",
+ notes = "Set hybrid table query time boundary based on offline segments'
metadata")
+ public SuccessResponse setEnforcedQueryTimeBoundary(
+ @ApiParam(value = "Name of the hybrid table (without type suffix)",
required = true) @PathParam("tableName")
+ String tableName)
+ throws Exception {
+ // Validate its a hybrid table
+ if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) ||
!_pinotHelixResourceManager.hasOfflineTable(
+ tableName)) {
+ throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid
table", Response.Status.BAD_REQUEST);
+ }
+
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+ // Call all servers to validate offline table state
+ Map<String, List<String>> serverToSegments =
_pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+ BiMap<String, String> serverEndPoints =
+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+ CompletionServiceHelper completionServiceHelper =
+ new CompletionServiceHelper(_executor, _connectionManager,
serverEndPoints);
+ List<String> serverUrls = new ArrayList<>();
+ BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+ for (String endpoint : endpointsToServers.keySet()) {
+ String reloadTaskStatusEndpoint = endpoint + "/tables/" +
offlineTableName + "/allSegmentsLoaded";
+ serverUrls.add(reloadTaskStatusEndpoint);
+ }
+
+ CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ completionServiceHelper.doMultiGetRequest(serverUrls, null, true,
10000);
+
+ if (serviceResponse._failedResponseCount > 0) {
+ throw new ControllerApplicationException(LOGGER, "Could not validate
table segment status",
+ Response.Status.SERVICE_UNAVAILABLE);
+ }
+
+ Long timeBoundary = null;
Review Comment:
```suggestion
long timeBoundaryMs = -1;
```
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long
extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
return endTimeMs;
}
- private void updateTimeBoundaryInfo(long maxEndTimeMs) {
- if (maxEndTimeMs > 0) {
- String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs -
_timeOffsetMs);
- TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
- if (currentTimeBoundaryInfo == null ||
!currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
- _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
- LOGGER.info("Updated time boundary to: {} for table: {}",
timeBoundary, _offlineTableName);
+ private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary,
long maxEndTimeMs,
+ boolean idealStateReffered) {
+ TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
+ Long finalTimeBoundaryMs = null;
+ boolean isEnforced = false;
+ boolean validTimeBoundaryFound = false;
+
+ if (enforcedTimeBoundary != null) {
+ finalTimeBoundaryMs = enforcedTimeBoundary;
+ isEnforced = true;
+ validTimeBoundaryFound = true;
+ LOGGER.info("Enforced table time boundary in use: {} for table: {}",
enforcedTimeBoundary, _offlineTableName);
+ } else if (idealStateReffered || !currentTimeBoundaryInfo.isEnforced()) {
+ if (maxEndTimeMs > 0) {
+ finalTimeBoundaryMs = maxEndTimeMs - _timeOffsetMs;
+ validTimeBoundaryFound = true;
+ } else {
+ LOGGER.warn("Failed to find segment with valid end time for table: {},
no time boundary generated",
+ _offlineTableName);
+ }
+ } else {
+ validTimeBoundaryFound = true;
+ LOGGER.info("Skipping time boundary update since enforced time boundary
exists");
+ }
+
+ if (validTimeBoundaryFound) {
+ if (finalTimeBoundaryMs != null) {
Review Comment:
We always want to update the metrics. Currently when user refresh a segment,
the metric won't be updated.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long
extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
return endTimeMs;
}
- private void updateTimeBoundaryInfo(long maxEndTimeMs) {
- if (maxEndTimeMs > 0) {
- String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs -
_timeOffsetMs);
- TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
- if (currentTimeBoundaryInfo == null ||
!currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
- _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
- LOGGER.info("Updated time boundary to: {} for table: {}",
timeBoundary, _offlineTableName);
+ private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary,
long maxEndTimeMs,
Review Comment:
(minor) We don't need to synchronize on it because it can only be called in
the synchronized method
##########
pinot-core/src/main/java/org/apache/pinot/core/routing/TimeBoundaryInfo.java:
##########
@@ -21,10 +21,12 @@
public class TimeBoundaryInfo {
private final String _timeColumn;
private final String _timeValue;
+ private final boolean _enforced;
Review Comment:
(optional) `_explicitlySet` might be more clear
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long
extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
return endTimeMs;
}
- private void updateTimeBoundaryInfo(long maxEndTimeMs) {
- if (maxEndTimeMs > 0) {
- String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs -
_timeOffsetMs);
- TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
- if (currentTimeBoundaryInfo == null ||
!currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
- _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
- LOGGER.info("Updated time boundary to: {} for table: {}",
timeBoundary, _offlineTableName);
+ private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary,
long maxEndTimeMs,
+ boolean idealStateReffered) {
Review Comment:
(minor)
```suggestion
boolean idealStateReferred) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]