liuchang0520 commented on a change in pull request #6778:
URL: https://github.com/apache/incubator-pinot/pull/6778#discussion_r618105301
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1239,86 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ /**
+ * Validate the committed low level consumer segments to see if its segment
store copy is available. Fix the missing segment store copy by asking servers
to upload to segment store.
+ * Since uploading to segment store involves expensive compression step
(first tar up the segment and then upload), we don't want to retry the
uploading. Segment without segment store copy can still be downloaded from peer
servers.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing
deep-store requirement for Realtime segment completion:Failure cases and
handling</a>
+ */
+ public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+ String realtimeTableName = tableConfig.getTableName();
+ // Get all the LLC segment ZK metadata for this table
+ List<LLCRealtimeSegmentZKMetadata> segmentZKMetadataList =
ZKMetadataProvider.getLLCRealtimeSegmentZKMetadataListForTable(_propertyStore,
realtimeTableName);
+
+ // Iterate through llc segments and upload missing segment store copy by
following steps:
+ // 1. Ask servers which have online segment replica to upload to segment
store. Servers return segment store download url after successful uploading.
+ // 2. Update the llc segment ZK metadata by adding segment store download
url.
+ for (LLCRealtimeSegmentZKMetadata segmentZKMetadata :
segmentZKMetadataList) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ // Only fix the committed llc segment without segment store copy
+ if (segmentZKMetadata.getStatus() == Status.DONE &&
(segmentZKMetadata.getDownloadUrl() == null ||
segmentZKMetadata.getDownloadUrl().isEmpty() ||
segmentZKMetadata.getDownloadUrl().equals(CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD)))
{
+ try {
+ if (!isExceededMaxSegmentCompletionTime(realtimeTableName,
segmentName, getCurrentTimeMs())) {
+ continue;
+ }
+ LOGGER.info("Fixing llc segment {} whose segment store copy is
unavailable", segmentName);
+
+ // Find servers which have online replica
+ List<URI> peerSegmentURIs = PeerServerSegmentFinder
+ .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL,
_helixManager);
+ if (peerSegmentURIs.isEmpty()) {
+ LOGGER.error("Failed to upload segment {} to segment store because
no online replica is found", segmentName);
+ continue;
+ }
+
+ // Randomly ask one server to upload
+ URI uri = peerSegmentURIs.get(_rand.nextInt(peerSegmentURIs.size()));
+ String serverUploadRequestUrl = StringUtil.join("/", uri.toString(),
"upload");
+ LOGGER.info("Ask server to upload llc segment to segment store by
this path: {}", serverUploadRequestUrl);
+ String segmentDownloadUrl =
uploadLLCSegmentByServer(serverUploadRequestUrl);
+
+ // Update the segment ZK metadata to include segment download url
+ if (segmentDownloadUrl == null || segmentDownloadUrl.isEmpty()) {
+ LOGGER.error("Failed to upload segment {} to segment store: no
segment download url is returned from server.", segmentName);
+ continue;
+ }
+ segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
+ persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
+ LOGGER.info("Successfully uploaded llc segment {} to segment store",
segmentName);
+ } catch (Exception e) {
+ LOGGER.error("Failed to upload segment {} to segment store",
segmentName, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Helper function to ask server to upload llc segment to segment store
+ */
+ private String uploadLLCSegmentByServer(String serverUploadRequestUrl)
+ throws URISyntaxException, IOException, HttpErrorStatusException {
+ HttpUriRequest request = buildServerUploadRequest(serverUploadRequestUrl);
+ try (CloseableHttpResponse response = _httpClient.execute(request)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ String responseStr = response.getEntity() == null ? "" :
EntityUtils.toString(response.getEntity());
+ if (statusCode >= 300) {
+ StringBuilder errorMsgBuilder = new
StringBuilder(String.format("Failed to ask server to upload LLC segment to
segment store by url: %s. ", serverUploadRequestUrl));
+ if (!responseStr.isEmpty()) {
+ errorMsgBuilder.append(responseStr);
+ }
+ throw new HttpErrorStatusException(errorMsgBuilder.toString(),
statusCode);
Review comment:
It won't abort the following segments upload. Added a catch phrase at
line 1290 when we iterate through the segments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]