KKcorps commented on code in PR #14798:
URL: https://github.com/apache/pinot/pull/14798#discussion_r1914056239
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -2009,4 +2119,151 @@ String moveSegmentFile(String rawTableName, String
segmentName, String segmentLo
URI createSegmentPath(String rawTableName, String segmentName) {
return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName,
URIUtils.encode(segmentName));
}
+
+ /**
+ * Re-ingests segments that are in DONE status with a missing download URL,
but also
+ * have no peer copy on any server. This method will call the server
reIngestSegment API
+ * on one of the alive servers that are supposed to host that segment
according to IdealState.
+ *
+ * API signature:
+ * POST http://[serverURL]/reIngestSegment
+ * Request body (JSON):
+ * {
+ * "tableNameWithType": [tableName],
+ * "segmentName": [segmentName],
+ * "uploadURI": [leadControllerUrl],
+ * "uploadSegment": true
+ * }
+ *
+ * @param tableNameWithType The table name with type, e.g. "myTable_REALTIME"
+ */
+ public void reIngestSegmentsWithErrorState(String tableNameWithType) {
+ // Step 1: Fetch the ExternalView and all segments
+ ExternalView externalView = getExternalView(tableNameWithType);
+ IdealState idealState = getIdealState(tableNameWithType);
+ Map<String, Map<String, String>> segmentToInstanceCurrentStateMap =
externalView.getRecord().getMapFields();
+ Map<String, Map<String, String>> segmentToInstanceIdealStateMap =
idealState.getRecord().getMapFields();
+
+ // find segments in ERROR state in externalView
+ List<String> segmentsInErrorState = new ArrayList<>();
+ for (Map.Entry<String, Map<String, String>> entry :
segmentToInstanceCurrentStateMap.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> instanceStateMap = entry.getValue();
+ boolean allReplicasInError = true;
+ for (String state : instanceStateMap.values()) {
+ if (!SegmentStateModel.ERROR.equals(state)) {
+ allReplicasInError = false;
+ break;
+ }
+ }
+ if (allReplicasInError) {
+ segmentsInErrorState.add(segmentName);
+ }
+ }
+
+ // filter out segments that are not ONLINE in IdealState
+ for (String segmentName : segmentsInErrorState) {
+ Map<String, String> instanceIdealStateMap =
segmentToInstanceIdealStateMap.get(segmentName);
+ boolean isOnline = true;
+ for (String state : instanceIdealStateMap.values()) {
+ if (!SegmentStateModel.ONLINE.equals(state)) {
+ isOnline = false;
+ break;
+ }
+ }
+ if (!isOnline) {
+ segmentsInErrorState.remove(segmentName);
+ }
+ }
+
+ // Step 2: For each segment, check the ZK metadata for conditions
+ for (String segmentName : segmentsInErrorState) {
+ // Skip non-LLC segments or segments missing from the ideal state
altogether
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+ if (llcSegmentName == null ||
!segmentToInstanceCurrentStateMap.containsKey(segmentName)) {
+ continue;
+ }
+
+ SegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(tableNameWithType, segmentName);
+ // We only consider segments that are in COMMITTING which is indicated
by having an endOffset
+ // but have a missing or placeholder download URL
+ if (segmentZKMetadata.getStatus() == Status.COMMITTING) {
+ Map<String, String> instanceStateMap =
segmentToInstanceIdealStateMap.get(segmentName);
+
+ // Step 3: “No peer has that segment.” => Re-ingest from one server
that is supposed to host it and is alive
+ LOGGER.info(
+ "Segment {} in table {} is COMMITTING with missing download URL
and no peer copy. Triggering re-ingestion.",
+ segmentName, tableNameWithType);
+
+ // Find at least one server that should host this segment and is alive
+ String aliveServer =
findAliveServerToReIngest(instanceStateMap.keySet());
+ if (aliveServer == null) {
+ LOGGER.warn("No alive server found to re-ingest segment {} in table
{}", segmentName, tableNameWithType);
+ continue;
+ }
+
+ String leadControllerUrl = getControllerVipUrl();
+ try {
+ _fileUploadDownloadClient.triggerReIngestion(aliveServer,
tableNameWithType, segmentName, leadControllerUrl);
+ LOGGER.info("Successfully triggered reIngestion for segment {} on
server {}", segmentName, aliveServer);
+ } catch (Exception e) {
+ LOGGER.error("Failed to call reIngestSegment for segment {} on
server {}", segmentName, aliveServer, e);
+ }
+ } else if (segmentZKMetadata.getStatus() == Status.UPLOADED) {
+ LOGGER.info(
+ "Segment {} in table {} is in ERROR state with download URL
present. Resetting segment to ONLINE state.",
+ segmentName, tableNameWithType);
+ _helixResourceManager.resetSegment(tableNameWithType, segmentName,
null);
+ }
+ }
+ }
+
+ /**
+ * Picks one 'alive' server among a set of servers that are supposed to host
the segment,
+ * e.g. by checking if Helix says it is enabled or if it appears in the live
instance list.
+ * This is a simple example; adapt to your environment’s definition of
“alive.”
+ */
+ //TODO: Might need to send url registered in DNS instead of host:port
Review Comment:
Address this TODO
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]