Jackie-Jiang commented on a change in pull request #6778:
URL: https://github.com/apache/incubator-pinot/pull/6778#discussion_r612825455
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1226,59 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ /**
+ * Validate the committed low level consumer segments to see if its segment
store copy is available. Fix the missing segment store copy by asking servers
to upload to segment store.
+ * Since uploading to segment store involves expensive compression step
(first tar up the segment and then upload), we don't want to retry the
uploading. Segment without segment store copy can still be downloaded from peer
servers.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing
deep-store requirement for Realtime segment completion:Failure cases and
handling</a>
+ */
+ public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+ String realtimeTableName = tableConfig.getTableName();
+ // Get the latest segment ZK metadata for each partition
+ Map<Integer, LLCRealtimeSegmentZKMetadata> llcSegmentZKMetadataMap =
+ getLatestSegmentZKMetadataMap(realtimeTableName);
+
+ // Iterate through llc segments and upload missing segment store copy by
following steps:
+ // 1. Ask servers which have online segment replica to upload to segment
store. Servers return segment store download url after successful uploading.
+ // 2. Update the llc segment ZK metadata by adding segment store download
url.
+ for (LLCRealtimeSegmentZKMetadata segmentZKMetadata :
llcSegmentZKMetadataMap.values()) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ // Only fix the committed llc segment without segment store copy
+ if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName) &&
segmentZKMetadata.getStatus() == Status.DONE &&
(segmentZKMetadata.getDownloadUrl() == null ||
segmentZKMetadata.getDownloadUrl().isEmpty())) {
Review comment:
No need to check the segment name because we only fetched the metadata
for the LLC segments
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1226,59 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ /**
+ * Validate the committed low level consumer segments to see if its segment
store copy is available. Fix the missing segment store copy by asking servers
to upload to segment store.
+ * Since uploading to segment store involves expensive compression step
(first tar up the segment and then upload), we don't want to retry the
uploading. Segment without segment store copy can still be downloaded from peer
servers.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing
deep-store requirement for Realtime segment completion:Failure cases and
handling</a>
+ */
+ public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+ String realtimeTableName = tableConfig.getTableName();
+ // Get the latest segment ZK metadata for each partition
+ Map<Integer, LLCRealtimeSegmentZKMetadata> llcSegmentZKMetadataMap =
Review comment:
We need to check all the segments instead of only the latest segment for
each partition. The latest segment is the consuming segment.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1226,59 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ /**
+ * Validate the committed low level consumer segments to see if its segment
store copy is available. Fix the missing segment store copy by asking servers
to upload to segment store.
+ * Since uploading to segment store involves expensive compression step
(first tar up the segment and then upload), we don't want to retry the
uploading. Segment without segment store copy can still be downloaded from peer
servers.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing
deep-store requirement for Realtime segment completion:Failure cases and
handling</a>
+ */
+ public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+ String realtimeTableName = tableConfig.getTableName();
+ // Get the latest segment ZK metadata for each partition
+ Map<Integer, LLCRealtimeSegmentZKMetadata> llcSegmentZKMetadataMap =
+ getLatestSegmentZKMetadataMap(realtimeTableName);
+
+ // Iterate through llc segments and upload missing segment store copy by
following steps:
+ // 1. Ask servers which have online segment replica to upload to segment
store. Servers return segment store download url after successful uploading.
+ // 2. Update the llc segment ZK metadata by adding segment store download
url.
+ for (LLCRealtimeSegmentZKMetadata segmentZKMetadata :
llcSegmentZKMetadataMap.values()) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ // Only fix the committed llc segment without segment store copy
+ if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName) &&
segmentZKMetadata.getStatus() == Status.DONE &&
(segmentZKMetadata.getDownloadUrl() == null ||
segmentZKMetadata.getDownloadUrl().isEmpty())) {
Review comment:
I vaguely remember we put a placeholder download url for peer download.
Please verify if that is the case, and if so, null check is not good enough
here.
##########
File path:
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
##########
@@ -965,6 +1065,13 @@ public void ensureAllPartitionsConsuming() {
getNewPartitionGroupMetadataList(_streamConfig,
Collections.emptyList()));
}
+ @Override
+ FileUploadDownloadClient initFileUploadDownloadClient() {
+ FileUploadDownloadClient fileUploadDownloadClient =
mock(FileUploadDownloadClient.class);
+ this._mockedFileUploadDownloadClient = fileUploadDownloadClient;
Review comment:
(nit) remove `this`
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1226,59 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ /**
+ * Validate the committed low level consumer segments to see if its segment
store copy is available. Fix the missing segment store copy by asking servers
to upload to segment store.
+ * Since uploading to segment store involves expensive compression step
(first tar up the segment and then upload), we don't want to retry the
uploading. Segment without segment store copy can still be downloaded from peer
servers.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing
deep-store requirement for Realtime segment completion:Failure cases and
handling</a>
+ */
+ public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+ String realtimeTableName = tableConfig.getTableName();
+ // Get the latest segment ZK metadata for each partition
+ Map<Integer, LLCRealtimeSegmentZKMetadata> llcSegmentZKMetadataMap =
+ getLatestSegmentZKMetadataMap(realtimeTableName);
+
+ // Iterate through llc segments and upload missing segment store copy by
following steps:
+ // 1. Ask servers which have online segment replica to upload to segment
store. Servers return segment store download url after successful uploading.
+ // 2. Update the llc segment ZK metadata by adding segment store download
url.
+ for (LLCRealtimeSegmentZKMetadata segmentZKMetadata :
llcSegmentZKMetadataMap.values()) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ // Only fix the committed llc segment without segment store copy
+ if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName) &&
segmentZKMetadata.getStatus() == Status.DONE &&
(segmentZKMetadata.getDownloadUrl() == null ||
segmentZKMetadata.getDownloadUrl().isEmpty())) {
+ try {
+ if (!isExceededMaxSegmentCompletionTime(realtimeTableName,
segmentName, getCurrentTimeMs())) {
+ continue;
+ }
+ LOGGER.info("Fixing llc segment {} whose segment store copy is
unavailable", segmentName);
+
+ // Find servers which have online replica
+ List<URI> peerSegmentURIs = PeerServerSegmentFinder
+ .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL,
_helixManager);
+ if (peerSegmentURIs.isEmpty()) {
+ LOGGER.error("Failed to upload segment {} to segment store because
no online replica is found", segmentName);
+ continue;
+ }
+
+ // Randomly ask one server to upload
+ Random r = new Random();
+ URI uri = peerSegmentURIs.get(r.nextInt(peerSegmentURIs.size()));
Review comment:
Please log the uri
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1226,59 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ /**
+ * Validate the committed low level consumer segments to see if its segment
store copy is available. Fix the missing segment store copy by asking servers
to upload to segment store.
+ * Since uploading to segment store involves expensive compression step
(first tar up the segment and then upload), we don't want to retry the
uploading. Segment without segment store copy can still be downloaded from peer
servers.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing
deep-store requirement for Realtime segment completion:Failure cases and
handling</a>
+ */
+ public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+ String realtimeTableName = tableConfig.getTableName();
+ // Get the latest segment ZK metadata for each partition
+ Map<Integer, LLCRealtimeSegmentZKMetadata> llcSegmentZKMetadataMap =
+ getLatestSegmentZKMetadataMap(realtimeTableName);
+
+ // Iterate through llc segments and upload missing segment store copy by
following steps:
+ // 1. Ask servers which have online segment replica to upload to segment
store. Servers return segment store download url after successful uploading.
+ // 2. Update the llc segment ZK metadata by adding segment store download
url.
+ for (LLCRealtimeSegmentZKMetadata segmentZKMetadata :
llcSegmentZKMetadataMap.values()) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ // Only fix the committed llc segment without segment store copy
+ if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName) &&
segmentZKMetadata.getStatus() == Status.DONE &&
(segmentZKMetadata.getDownloadUrl() == null ||
segmentZKMetadata.getDownloadUrl().isEmpty())) {
+ try {
+ if (!isExceededMaxSegmentCompletionTime(realtimeTableName,
segmentName, getCurrentTimeMs())) {
+ continue;
+ }
+ LOGGER.info("Fixing llc segment {} whose segment store copy is
unavailable", segmentName);
+
+ // Find servers which have online replica
+ List<URI> peerSegmentURIs = PeerServerSegmentFinder
+ .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL,
_helixManager);
+ if (peerSegmentURIs.isEmpty()) {
+ LOGGER.error("Failed to upload segment {} to segment store because
no online replica is found", segmentName);
+ continue;
+ }
+
+ // Randomly ask one server to upload
+ Random r = new Random();
Review comment:
Make the `Random` constant instead of creating one per segment
##########
File path:
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -727,6 +727,23 @@ public SimpleHttpResponse uploadSegment(URI uri, String
segmentName, InputStream
return uploadSegment(uri, segmentName, inputStream, null, parameters,
DEFAULT_SOCKET_TIMEOUT_MS);
}
+ /**
+ * Controller periodic task uses this endpoint to ask servers to upload
committed llc segment to segment store if missing.
+ * @param uri The uri to ask servers to upload segment to segment store
+ * @return the uploaded segment download url from segment store
+ * @throws URISyntaxException
+ * @throws IOException
+ * @throws HttpErrorStatusException
+ */
+ public String uploadToSegmentStore(String uri)
Review comment:
I wouldn't put this method in this class. This class is used to
upload/download files from controller. While this method is sending request to
the servers. You can just add a helper method into the
`PinotLLCRealtimeSegmentManager`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]