mcvsubbu commented on a change in pull request #6778:
URL: https://github.com/apache/pinot/pull/6778#discussion_r704761568
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
##########
@@ -173,6 +173,12 @@
public static final String SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS =
"controller.segmentRelocator.initialDelayInSeconds";
+ // configs for uploading missing LLC segments to deep store
+ public static final String ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT =
Review comment:
Can you please add a paragraph (liberal comments with examples. they are
zero cost) before each config you have introduced? It will help a lot when we
stare at this code six months down the road, or when we start to look at some
behavior in production.
##########
File path:
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -796,6 +796,33 @@ public SimpleHttpResponse uploadSegment(URI uri, String
segmentName, InputStream
return uploadSegment(uri, segmentName, inputStream, null, parameters,
DEFAULT_SOCKET_TIMEOUT_MS);
}
+ /**
+ * Used by controllers to send requests to servers:
+ * Controller periodic task uses this endpoint to ask servers
+ * to upload committed llc segment to segment store if missing.
+ * @param uri The uri to ask servers to upload segment to segment store
+ * @return the uploaded segment download url from segment store
+ * @throws URISyntaxException
+ * @throws IOException
+ * @throws HttpErrorStatusException
+ */
+ public String uploadToSegmentStore(String uri)
+ throws URISyntaxException, IOException, HttpErrorStatusException {
+ RequestBuilder requestBuilder = RequestBuilder.post(new
URI(uri)).setVersion(HttpVersion.HTTP_1_1);
+ setTimeout(requestBuilder, DEFAULT_SOCKET_TIMEOUT_MS);
+ // sendRequest checks the response status code
+ SimpleHttpResponse response = sendRequest(requestBuilder.build());
+ String downloadUrl = response.getResponse();
+ if (downloadUrl.isEmpty()) {
+ throw new HttpErrorStatusException(
+ String.format(
+ "Returned segment download url is empty after requesting servers
to upload by the path: %s",
+ uri),
Review comment:
I think this uri includes segment name. If not, please add it to the
message.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
##########
@@ -67,6 +68,23 @@ public RealtimeSegmentValidationManager(ControllerConf
config, PinotHelixResourc
Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
}
+ // TODO: Fix the race condition when controller leadership may not be
decided by the time the method is called
Review comment:
Is there an issue created for this? If not, please create an issue, and
add the issue number here. How and when do you plan to fix this race condition?
It should be sooner than later. You may want to get another PR ready for that
really quickly.
If you think it is going to take a very long time, then we should just wait
until we merge this PR. Otherwise, the subtlety of the race condition will be
lost.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1243,4 +1327,169 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ // Pre-fetch the LLC segments without deep store copy.
+ public void prefetchLLCSegmentsWithoutDeepStoreCopy(String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ return;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}",
tableNameWithType);
+ return;
+ }
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ return;
+ }
+
+ long currentTimeMs = getCurrentTimeMs();
+ List<String> segmentNames =
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+ for (String segmentName : segmentNames) {
+ try {
+ // Only fetch recently created LLC segment to alleviate ZK access.
+ // Validate segment creation time from segment name.
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ if (currentTimeMs - llcSegmentName.getCreationTimeMs() >
_deepStoreLLCSegmentUploadRetryRangeMs) {
+ continue;
+ }
+
+ SegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+ // Cache the committed LLC segments without deep store download url
+ if (segmentZKMetadata.getStatus() == Status.DONE
+ &&
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+ }
+ } catch (Exception e) {
+ _controllerMetrics.addMeteredTableValue(tableNameWithType,
+ ControllerMeter.LLC_SEGMENTS_ZK_METADATA_PREFETCH_ERROR, 1L);
+ LOGGER.error("Failed to fetch the LLC segment {} ZK metadata",
segmentName);
+ }
+ }
+ }
+
+ /**
+ * Fix the missing LLC segment in deep store by asking servers to upload,
and add deep store download uri in ZK.
+ * Since uploading to deep store involves expensive compression step (first
tar up the segment and then upload),
+ * we don't want to retry the uploading. Segment without deep store copy can
still be downloaded from peer servers.
+ *
+ * @see <a href="
+ *
https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion
+ * "> By-passing deep-store requirement for Realtime segment
completion:Failure cases and handling</a>
+ *
+ * TODO: Add an on-demand way to upload LLC segment to deep store for a
specific table.
+ */
+ public void uploadToDeepStoreIfMissing(TableConfig tableConfig) {
+ String realtimeTableName = tableConfig.getTableName();
+ Queue<String> segmentQueue =
_llcSegmentMapForUpload.get(realtimeTableName);
+ if (segmentQueue == null || segmentQueue.isEmpty()) {
+ return;
+ }
+
+ if (_isStopping) {
+ LOGGER.info(
+ "Skipped fixing deep store copy of LLC segments for table {},
because segment manager is stopping.",
+ realtimeTableName);
+ return;
+ }
+
+ // Store the segments to be fixed again in the case of fix failure, or
skip in this round
+ Queue<String> segmentsNotFixed = new LinkedList<>();
+
+ long retentionMs =
+
TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase())
+
.toMillis(Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
+ RetentionStrategy retentionStrategy = new TimeRetentionStrategy(
+ TimeUnit.MILLISECONDS,
+ retentionMs -
MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS);
+
+ // Iterate through LLC segments and upload missing deep store copy by
following steps:
+ // 1. Ask servers which have online segment replica to upload to deep
store.
+ // Servers return deep store download url after successful uploading.
+ // 2. Update the LLC segment ZK metadata by adding deep store download
url.
+ while (!segmentQueue.isEmpty()) {
+ // TODO: Reevaluate the parallelism of upload operation. Currently the
upload operation is conducted in
+ // sequential order. Compared with parallel mode, it will take longer
time but put less pressure on
+ // servers. We may need to rate control the upload request if it is
changed to be in parallel.
+ String segmentName = segmentQueue.poll();
+ try {
+ // Only fix recently created segment. Validate segment creation time
based on name.
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ if (getCurrentTimeMs() - llcSegmentName.getCreationTimeMs() >
_deepStoreLLCSegmentUploadRetryRangeMs) {
+ LOGGER.info(
+ "Skipped fixing LLC segment {} which is created before deep
store upload retry time range",
+ segmentName);
+ continue;
+ }
+
+ Stat stat = new Stat();
+ SegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+ // If the download url is already fixed, skip the fix for this segment.
+ if
(!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ LOGGER.info(
+ "Skipped fixing LLC segment {} whose deep store download url is
already available",
+ segmentName);
+ continue;
+ }
+ // Skip the fix for the segment if it is already out of retention.
+ if (retentionStrategy.isPurgeable(realtimeTableName,
segmentZKMetadata)) {
+ LOGGER.info("Skipped fixing LLC segment {} which is already out of
retention", segmentName);
+ continue;
+ }
+ // Delay the fix to next round if not enough time elapsed since
segment metadata update
+ if (!isExceededMinTimeToFixDeepStoreCopy(stat)) {
Review comment:
Is it useful to add a log message here as well?
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -613,6 +687,16 @@ private boolean isPeerURL(String segmentLocation) {
.startsWith(CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME);
}
+ /**
+ * Cache the LLC segment without deep store download uri to
Review comment:
This class is getting too big. Is it possible to move some of the new
code added to a new class? Will make things more readable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]