Jackie-Jiang commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r975920438
##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java:
##########
@@ -112,7 +111,7 @@ private void testDailyPushTable(String rawTableName,
TableConfig tableConfig, Ti
Map<String, String> offlineInstanceStateMap =
Collections.singletonMap("server", OFFLINE);
Set<String> onlineSegments = new HashSet<>();
// NOTE: Ideal state is not used in the current implementation.
- IdealState idealState = mock(IdealState.class);
+ IdealState idealState = new IdealState("");
Review Comment:
Do we need to change this?
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -105,6 +106,14 @@ public TimeBoundaryManager(TableConfig tableConfig,
ZkHelixPropertyStore<ZNRecor
@SuppressWarnings("unused")
public void init(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments) {
// Bulk load time info for all online segments
+ String enforcedTimeBoundary =
idealState.getRecord().getSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY);
Review Comment:
When time boundary is explicitly set, we want to skip fetching the segment
ZK metadata because they won't be used
##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -514,4 +529,47 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
}
return segmentConsumerInfoList;
}
+
+ @GET
+ @Path("tables/{tableNameWithType}/validate")
Review Comment:
Suggest more explicit API, such as `/allSegmentsLoaded`
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -731,4 +730,8 @@ public static class Range {
public static final String UPPER_UNBOUNDED = DELIMITER + UNBOUNDED +
UPPER_EXCLUSIVE;
}
}
+
+ public static class IdealState {
+ public static final String QUERY_TIME_BOUNDARY = "query.time.boundary";
Review Comment:
(optional) I feel `TIME_BOUNDARY` is more concise. To be more specific,
`HYBRID_TABLE_TIME_BOUNDARY` might also be good
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -63,6 +63,7 @@ public class TimeBoundaryManager {
private final Map<String, Long> _endTimeMsMap = new HashMap<>();
private volatile TimeBoundaryInfo _timeBoundaryInfo;
+ private volatile TimeBoundaryInfo _enforcedTimeBoundaryInfo;
Review Comment:
(minor) We shouldn't need to keep them separate. We can directly update the
`_timeBoundaryInfo` when the time is set in IS
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,106 @@ public Map<String, Map<String, String>> getControllerJobs(
return result;
}
+ @POST
+ @Path("table/{tableName}/enforceQueryTimeBoundary")
+ @ApiOperation(value = "Set hybrid table query time boundary based on offline
segments' metadata",
+ notes = "Set hybrid table query time boundary based on offline segments'
metadata")
+ public SuccessResponse setEnforcedQueryTimeBoundary(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName)
+ throws Exception {
+ // Validate its a hybrid table
+ if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) ||
!_pinotHelixResourceManager.hasOfflineTable(
+ tableName)) {
+ throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid
table", Response.Status.BAD_REQUEST);
+ }
+
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+ // Call all servers to validate offline table state
+ Map<String, List<String>> serverToSegments =
_pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+ BiMap<String, String> serverEndPoints =
+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+ CompletionServiceHelper completionServiceHelper =
+ new CompletionServiceHelper(_executor, _connectionManager,
serverEndPoints);
+ List<String> serverUrls = new ArrayList<>();
+ BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+ for (String endpoint : endpointsToServers.keySet()) {
+ String reloadTaskStatusEndpoint = endpoint + "/tables/" +
offlineTableName + "/validate";
Review Comment:
Let's make a separate API for the validate because it can also be useful for
other purposes
##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -514,4 +529,47 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
}
return segmentConsumerInfoList;
}
+
+ @GET
+ @Path("tables/{tableNameWithType}/validate")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Validates if the ideal state matches with the
segmentstate on this server", notes =
+ "Validates if the ideal state matches with the segmentstate on this
server")
+ public TableSegmentValidationInfo validateTableIdealState(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableNameWithType")
+ String tableNameWithType) {
+ // Get table current ideal state
+ IdealState tableIdealState = HelixHelper.getTableIdealState(_helixManager,
tableNameWithType);
+ TableDataManager tableDataManager =
+ ServerResourceUtils.checkGetTableDataManager(_serverInstance,
tableNameWithType);
+
+ // Validate segments in idealstate which belong to this server
+ long maxEndTime = -1;
+ Map<String, Map<String, String>> instanceStatesMap =
tableIdealState.getRecord().getMapFields();
+ for (Map.Entry<String, Map<String, String>> kv :
instanceStatesMap.entrySet()) {
+ String segmentName = kv.getKey();
+ if (kv.getValue().containsKey(_instanceId)) {
Review Comment:
We want to check the state of the server in IS. For `ONLINE`, check the CRC;
For `CONSUMING`, check if the segment exist; Skip other states.
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/AdminApiApplication.java:
##########
@@ -58,7 +59,7 @@ public class AdminApiApplication extends ResourceConfig {
private HttpServer _httpServer;
public AdminApiApplication(ServerInstance instance, AccessControlFactory
accessControlFactory,
- PinotConfiguration serverConf) {
+ PinotConfiguration serverConf, HelixManager helixManager) {
Review Comment:
(minor) We can pass `HelixManager` through the `ServerInstance`. We already
pass it in the `ServerInstance` constructor
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,106 @@ public Map<String, Map<String, String>> getControllerJobs(
return result;
}
+ @POST
+ @Path("table/{tableName}/enforceQueryTimeBoundary")
+ @ApiOperation(value = "Set hybrid table query time boundary based on offline
segments' metadata",
+ notes = "Set hybrid table query time boundary based on offline segments'
metadata")
+ public SuccessResponse setEnforcedQueryTimeBoundary(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName)
Review Comment:
The input has to be raw table name, or the table validation will fail.
Consider revising the parameter description to be more explicit
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,106 @@ public Map<String, Map<String, String>> getControllerJobs(
return result;
}
+ @POST
+ @Path("table/{tableName}/enforceQueryTimeBoundary")
Review Comment:
Suggest simplifying it. Same for the DELETE API
```suggestion
@Path("table/{tableName}/timeBoundary")
```
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -167,6 +181,14 @@ private void updateTimeBoundaryInfo(long maxEndTimeMs) {
@SuppressWarnings("unused")
public synchronized void onAssignmentChange(IdealState idealState,
ExternalView externalView,
Set<String> onlineSegments) {
+ String enforcedTimeBoundary =
idealState.getRecord().getSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY);
Review Comment:
Same here. When the time boundary is explicitly set, we want to skip the
`_endTimeMsMap` updates. Same for `refreshSegment()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]