mcvsubbu commented on a change in pull request #6778:
URL: https://github.com/apache/incubator-pinot/pull/6778#discussion_r644165186
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -135,9 +152,16 @@
private final Lock[] _idealStateUpdateLocks;
private final TableConfigCache _tableConfigCache;
private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+ private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled;
private volatile boolean _isStopping = false;
private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+ private FileUploadDownloadClient _fileUploadDownloadClient;
+ // Map caching the LLC segment names without deep store download uri.
Controller gets the LLC segment names from this map, and asks servers to upload
the segments to segment store. This helps to alleviates excessive ZK access
when fetching LLC segment list.
Review comment:
nit: line too long. Please fix all your comment lines that are
extra-long. Maybe you need to update your ide settings?
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ // Pre-fetch the LLC segments without deep store copy.
+ public void prefetchLLCSegmentsWithoutDeepStoreCopy(String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ return;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}",
tableNameWithType);
+ return;
+ }
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ return;
+ }
+
+ long currentTimeMs = getCurrentTimeMs();
+ List<String> segmentNames =
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+ for (String segmentName : segmentNames) {
+ try {
+ if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+ continue;
+ }
+
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+ // Cache the committed LLC segments without segment store download
url
+ if (segmentZKMetadata.getStatus() == Status.DONE &&
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+ }
+ } catch (Exception e) {
+ LOGGER.error("failed to fetch the LLC segment {} ZK metadata",
segmentName);
+ }
+ }
+ }
+
+ // Only validate recently created LLC segment for missing deep store
download url. The time range check is based on segment name. This step helps to
alleviate ZK access.
+ private boolean isLLCSegmentWithinValidationRange(String segmentName, long
currentTimeMs) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ long creationTimeMs = llcSegmentName.getCreationTimeMs();
+ return currentTimeMs - creationTimeMs <
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+ }
+
+ /**
+ * Fix the missing LLC segment in deep store by asking servers to upload,
and add segment store download uri in ZK. Since uploading to segment store
involves expensive compression step (first tar up the segment and then upload),
we don't want to retry the uploading. Segment without segment store copy can
still be downloaded from peer servers.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing
deep-store requirement for Realtime segment completion:Failure cases and
handling</a>
+ */
+ public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+ String realtimeTableName = tableConfig.getTableName();
+ if (_isStopping) {
+ LOGGER.info("Skipped fixing segment store copy of LLC segments for table
{}, because segment manager is stopping.", realtimeTableName);
+ return;
+ }
+
+ Queue<String> segmentQueue =
_llcSegmentMapForUpload.get(realtimeTableName);
+ if (segmentQueue == null || segmentQueue.isEmpty()) {
+ LOGGER.info("Skipped fixing segment store copy of LLC segments for table
{}: all segments are available in segment store.", realtimeTableName);
+ return;
+ }
+
+ // Store the segments to be fixed again in the case of fix failure, or
skip in this round
+ Queue<String> segmentsNotFixed = new LinkedList<>();
+
+ // Iterate through LLC segments and upload missing segment store copy by
following steps:
+ // 1. Ask servers which have online segment replica to upload to segment
store. Servers return segment store download url after successful uploading.
+ // 2. Update the LLC segment ZK metadata by adding segment store download
url.
+ while (!segmentQueue.isEmpty()) {
+ String segmentName = segmentQueue.poll();
+ // Check if it's null in case of the while condition doesn't stand true
anymore in the step of dequeue. Dequeue returns null if queue is empty.
+ if (segmentName == null) {
+ break;
+ }
+
+ try {
+ Stat stat = new Stat();
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+ // if the segment status is no longer DONE, or the download url is
already fixed, skip the fix for this segment.
+ if (segmentZKMetadata.getStatus() != Status.DONE ||
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ continue;
+ }
+ // delay the fix to next round if not enough time elapsed since
segment metadata update
+ if (!isExceededMinTimeToFixSegmentStoreCopy(stat)) {
+ segmentsNotFixed.offer(segmentName);
+ continue;
+ }
+ LOGGER.info("Fixing LLC segment {} whose segment store copy is
unavailable", segmentName);
+
+ // Find servers which have online replica
+ List<URI> peerSegmentURIs = PeerServerSegmentFinder
+ .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL,
_helixManager);
+ if (peerSegmentURIs.isEmpty()) {
+ throw new IllegalStateException(String.format("Failed to upload
segment %s to segment store because no online replica is found", segmentName));
+ }
+
+ // Randomly ask one server to upload
+ URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
+ String serverUploadRequestUrl = StringUtil.join("/", uri.toString(),
"upload");
+ LOGGER.info("Ask server to upload LLC segment {} to segment store by
this path: {}", segmentName, serverUploadRequestUrl);
+ String segmentDownloadUrl =
_fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
+ LOGGER.info("Updating segment {} download url in ZK to be {}",
segmentName, segmentDownloadUrl);
+ // Update segment ZK metadata by adding the download URL
+ segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
+ persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata,
stat.getVersion());
+ LOGGER.info("Successfully uploaded LLC segment {} to segment store
with download url: {}", segmentName, segmentDownloadUrl);
+ } catch (Exception e) {
+ segmentsNotFixed.offer(segmentName);
+ _controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.NUMBER_LLC_SEGMENTS_DEEP_STORE_UPLOAD_FIX_ERROR, 1L);
+ LOGGER.error("Failed to upload segment {} to segment store",
segmentName, e);
Review comment:
If a segment was present in the queue, but the retention manager deleted
it, we will be retrying that for a long time. We should make sure that
retention manager also removes segments from queue as they are retained out.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -122,6 +134,11 @@
* The segment will be eligible for repairs by the validation manager, if
the time exceeds this value
*/
private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; //
5 MINUTES
+ /**
Review comment:
Nit: Can you match the comment style like in line 129 to 135? (asterisk
on each new line). thanks
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -157,6 +181,21 @@ public
PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan
}
_tableConfigCache = new TableConfigCache(_propertyStore);
_flushThresholdUpdateManager = new FlushThresholdUpdateManager();
+ _isUploadingRealtimeMissingSegmentStoreCopyEnabled =
controllerConf.isUploadingRealtimeMissingSegmentStoreCopyEnabled();
+ if (_isUploadingRealtimeMissingSegmentStoreCopyEnabled) {
+ _fileUploadDownloadClient = initFileUploadDownloadClient();
+ _llcSegmentMapForUpload = new ConcurrentHashMap<>();
+ _validationRangeForLLCSegmentsDeepStoreCopyMs =
(long)controllerConf.getValidationRangeInDaysToCheckMissingSegmentStoreCopy() *
24 * 3600 * 1000;
Review comment:
Use `TimeUnit` class to convert to millis, thanks
##########
File path:
pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
##########
@@ -49,7 +49,8 @@
NUMBER_TIMES_SCHEDULE_TASKS_CALLED("tasks", true),
NUMBER_TASKS_SUBMITTED("tasks", false),
NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED("SegmentUploadTimeouts", true),
- CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false);
+ CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false),
+
NUMBER_LLC_SEGMENTS_DEEP_STORE_UPLOAD_FIX_ERROR("llcSegmentDeepStoreUploadFixError",
true);
Review comment:
Please make this a gauge. You can set the gauge to the number of
segments in `_llcSegmentMapForUpload`. It is best not to make it a per-table
value, since all tables are likely to have these errors.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -122,6 +134,11 @@
* The segment will be eligible for repairs by the validation manager, if
the time exceeds this value
*/
private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; //
5 MINUTES
+ /**
+ * Controller waits this amount of time before asking servers to upload LLC
segments without deep store copy. The reason is after step 1 of segment
completion is done (segment ZK metadata status changed to be DONE), servers are
still in the process of loading segments. Only after that segments are in
ONLINE status in external view for the controller to discover.
Review comment:
```suggestion
* Controller waits this amount of time before asking servers to upload
LLC segments without deep store copy.
* The reason is after step 1 of segment completion is done (segment ZK
metadata status changed to be
* DONE), servers may be still in the process of loading segments. Only
after that segments are in ONLINE
* status in external view for the controller to discover.
```
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ // Pre-fetch the LLC segments without deep store copy.
+ public void prefetchLLCSegmentsWithoutDeepStoreCopy(String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ return;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}",
tableNameWithType);
+ return;
+ }
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ return;
+ }
+
+ long currentTimeMs = getCurrentTimeMs();
+ List<String> segmentNames =
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+ for (String segmentName : segmentNames) {
+ try {
+ if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+ continue;
+ }
+
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+ // Cache the committed LLC segments without segment store download
url
+ if (segmentZKMetadata.getStatus() == Status.DONE &&
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+ }
+ } catch (Exception e) {
+ LOGGER.error("failed to fetch the LLC segment {} ZK metadata",
segmentName);
Review comment:
This could be bad, right? If we are not able to fetch segment metadata
here, that segment will be left blank in deepstore for a long time...until next
restart or mastership shift. I think this needs a metric bumped.
```suggestion
LOGGER.error("Failed to fetch the LLC segment {} ZK metadata",
segmentName);
```
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ // Pre-fetch the LLC segments without deep store copy.
+ public void prefetchLLCSegmentsWithoutDeepStoreCopy(String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ return;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}",
tableNameWithType);
+ return;
+ }
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ return;
+ }
+
+ long currentTimeMs = getCurrentTimeMs();
+ List<String> segmentNames =
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+ for (String segmentName : segmentNames) {
+ try {
+ if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
Review comment:
i would prefer a method name like `isOlderThan` or `isNewerThan`
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ // Pre-fetch the LLC segments without deep store copy.
+ public void prefetchLLCSegmentsWithoutDeepStoreCopy(String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ return;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}",
tableNameWithType);
+ return;
+ }
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ return;
+ }
+
+ long currentTimeMs = getCurrentTimeMs();
+ List<String> segmentNames =
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+ for (String segmentName : segmentNames) {
+ try {
+ if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+ continue;
+ }
+
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+ // Cache the committed LLC segments without segment store download
url
+ if (segmentZKMetadata.getStatus() == Status.DONE &&
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+ }
+ } catch (Exception e) {
+ LOGGER.error("failed to fetch the LLC segment {} ZK metadata",
segmentName);
+ }
+ }
+ }
+
+ // Only validate recently created LLC segment for missing deep store
download url. The time range check is based on segment name. This step helps to
alleviate ZK access.
+ private boolean isLLCSegmentWithinValidationRange(String segmentName, long
currentTimeMs) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ long creationTimeMs = llcSegmentName.getCreationTimeMs();
+ return currentTimeMs - creationTimeMs <
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+ }
+
+ /**
+ * Fix the missing LLC segment in deep store by asking servers to upload,
and add segment store download uri in ZK. Since uploading to segment store
involves expensive compression step (first tar up the segment and then upload),
we don't want to retry the uploading. Segment without segment store copy can
still be downloaded from peer servers.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing
deep-store requirement for Realtime segment completion:Failure cases and
handling</a>
+ */
+ public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+ String realtimeTableName = tableConfig.getTableName();
+ if (_isStopping) {
+ LOGGER.info("Skipped fixing segment store copy of LLC segments for table
{}, because segment manager is stopping.", realtimeTableName);
+ return;
+ }
+
+ Queue<String> segmentQueue =
_llcSegmentMapForUpload.get(realtimeTableName);
+ if (segmentQueue == null || segmentQueue.isEmpty()) {
+ LOGGER.info("Skipped fixing segment store copy of LLC segments for table
{}: all segments are available in segment store.", realtimeTableName);
Review comment:
In a cluster with 1000s of tables, we will keep seeing this message.
Remove this log.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ // Pre-fetch the LLC segments without deep store copy.
+ public void prefetchLLCSegmentsWithoutDeepStoreCopy(String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ return;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
Review comment:
Should check for leadership for table here. If not leader, then remove
the segments from the queue
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ // Pre-fetch the LLC segments without deep store copy.
+ public void prefetchLLCSegmentsWithoutDeepStoreCopy(String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ return;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}",
tableNameWithType);
+ return;
+ }
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ return;
+ }
+
+ long currentTimeMs = getCurrentTimeMs();
+ List<String> segmentNames =
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+ for (String segmentName : segmentNames) {
+ try {
+ if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+ continue;
+ }
+
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+ // Cache the committed LLC segments without segment store download
url
+ if (segmentZKMetadata.getStatus() == Status.DONE &&
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+ }
+ } catch (Exception e) {
+ LOGGER.error("failed to fetch the LLC segment {} ZK metadata",
segmentName);
+ }
+ }
+ }
+
+ // Only validate recently created LLC segment for missing deep store
download url. The time range check is based on segment name. This step helps to
alleviate ZK access.
+ private boolean isLLCSegmentWithinValidationRange(String segmentName, long
currentTimeMs) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ long creationTimeMs = llcSegmentName.getCreationTimeMs();
+ return currentTimeMs - creationTimeMs <
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+ }
+
+ /**
+ * Fix the missing LLC segment in deep store by asking servers to upload,
and add segment store download uri in ZK. Since uploading to segment store
involves expensive compression step (first tar up the segment and then upload),
we don't want to retry the uploading. Segment without segment store copy can
still be downloaded from peer servers.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing
deep-store requirement for Realtime segment completion:Failure cases and
handling</a>
+ */
+ public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+ String realtimeTableName = tableConfig.getTableName();
+ if (_isStopping) {
+ LOGGER.info("Skipped fixing segment store copy of LLC segments for table
{}, because segment manager is stopping.", realtimeTableName);
+ return;
+ }
+
+ Queue<String> segmentQueue =
_llcSegmentMapForUpload.get(realtimeTableName);
+ if (segmentQueue == null || segmentQueue.isEmpty()) {
+ LOGGER.info("Skipped fixing segment store copy of LLC segments for table
{}: all segments are available in segment store.", realtimeTableName);
+ return;
+ }
+
+ // Store the segments to be fixed again in the case of fix failure, or
skip in this round
+ Queue<String> segmentsNotFixed = new LinkedList<>();
+
+ // Iterate through LLC segments and upload missing segment store copy by
following steps:
+ // 1. Ask servers which have online segment replica to upload to segment
store. Servers return segment store download url after successful uploading.
+ // 2. Update the LLC segment ZK metadata by adding segment store download
url.
+ while (!segmentQueue.isEmpty()) {
+ String segmentName = segmentQueue.poll();
+ // Check if it's null in case of the while condition doesn't stand true
anymore in the step of dequeue. Dequeue returns null if queue is empty.
+ if (segmentName == null) {
+ break;
+ }
+
+ try {
+ Stat stat = new Stat();
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+ // if the segment status is no longer DONE, or the download url is
already fixed, skip the fix for this segment.
+ if (segmentZKMetadata.getStatus() != Status.DONE ||
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
Review comment:
A segment will not make it to the queue until the status is DONE. Why do
we have this check?
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ // Pre-fetch the LLC segments without deep store copy.
+ public void prefetchLLCSegmentsWithoutDeepStoreCopy(String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ return;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}",
tableNameWithType);
+ return;
+ }
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ return;
+ }
+
+ long currentTimeMs = getCurrentTimeMs();
+ List<String> segmentNames =
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+ for (String segmentName : segmentNames) {
+ try {
+ if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+ continue;
+ }
+
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+ // Cache the committed LLC segments without segment store download
url
+ if (segmentZKMetadata.getStatus() == Status.DONE &&
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+ }
+ } catch (Exception e) {
+ LOGGER.error("failed to fetch the LLC segment {} ZK metadata",
segmentName);
+ }
+ }
+ }
+
+ // Only validate recently created LLC segment for missing deep store
download url. The time range check is based on segment name. This step helps to
alleviate ZK access.
+ private boolean isLLCSegmentWithinValidationRange(String segmentName, long
currentTimeMs) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ long creationTimeMs = llcSegmentName.getCreationTimeMs();
+ return currentTimeMs - creationTimeMs <
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+ }
+
+ /**
+ * Fix the missing LLC segment in deep store by asking servers to upload,
and add segment store download uri in ZK. Since uploading to segment store
involves expensive compression step (first tar up the segment and then upload),
we don't want to retry the uploading. Segment without segment store copy can
still be downloaded from peer servers.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing
deep-store requirement for Realtime segment completion:Failure cases and
handling</a>
+ */
+ public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+ String realtimeTableName = tableConfig.getTableName();
+ if (_isStopping) {
+ LOGGER.info("Skipped fixing segment store copy of LLC segments for table
{}, because segment manager is stopping.", realtimeTableName);
+ return;
+ }
+
+ Queue<String> segmentQueue =
_llcSegmentMapForUpload.get(realtimeTableName);
+ if (segmentQueue == null || segmentQueue.isEmpty()) {
+ LOGGER.info("Skipped fixing segment store copy of LLC segments for table
{}: all segments are available in segment store.", realtimeTableName);
+ return;
+ }
+
+ // Store the segments to be fixed again in the case of fix failure, or
skip in this round
+ Queue<String> segmentsNotFixed = new LinkedList<>();
+
+ // Iterate through LLC segments and upload missing segment store copy by
following steps:
+ // 1. Ask servers which have online segment replica to upload to segment
store. Servers return segment store download url after successful uploading.
+ // 2. Update the LLC segment ZK metadata by adding segment store download
url.
+ while (!segmentQueue.isEmpty()) {
+ String segmentName = segmentQueue.poll();
+ // Check if it's null in case of the while condition doesn't stand true
anymore in the step of dequeue. Dequeue returns null if queue is empty.
+ if (segmentName == null) {
+ break;
+ }
+
+ try {
+ Stat stat = new Stat();
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+ // if the segment status is no longer DONE, or the download url is
already fixed, skip the fix for this segment.
+ if (segmentZKMetadata.getStatus() != Status.DONE ||
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ continue;
Review comment:
Log a msg here and at every point below as needed.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
##########
@@ -67,6 +67,22 @@ public RealtimeSegmentValidationManager(ControllerConf
config, PinotHelixResourc
Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
}
+ @Override
+ protected void setUpTask() {
+ // Prefetch the LLC segment without segment store copy from ZK, which
helps to alleviate ZK access.
Review comment:
Are you sure that the controller leadership for a table is established
by this time? We should do this in a callback from LeadControllerManager. And
yes, this needs to be coded up. There is no callback right now, afaik.
@jackjlli can help.
##########
File path:
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -727,6 +727,23 @@ public SimpleHttpResponse uploadSegment(URI uri, String
segmentName, InputStream
return uploadSegment(uri, segmentName, inputStream, null, parameters,
DEFAULT_SOCKET_TIMEOUT_MS);
}
+ /**
+ * Controller periodic task uses this endpoint to ask servers to upload
committed llc segment to segment store if missing.
+ * @param uri The uri to ask servers to upload segment to segment store
+ * @return the uploaded segment download url from segment store
+ * @throws URISyntaxException
+ * @throws IOException
+ * @throws HttpErrorStatusException
+ */
+ public String uploadToSegmentStore(String uri)
Review comment:
+1, I agree with Jackie.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]