mcvsubbu commented on a change in pull request #6778:
URL: https://github.com/apache/incubator-pinot/pull/6778#discussion_r655517240
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
##########
@@ -142,6 +148,8 @@ private static long getRandomInitialDelayInSeconds() {
private static final int
DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
private static final int DEFAULT_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS =
60 * 60;
+
+ private static final int
DEFAULT_VALIDATION_RANGE_IN_DAYS_TO_CHECK_MISSING_SEGMENT_STORE_COPY = 3;
Review comment:
Please keep the units at the end of the member name (*_IN_DAYS).
Also, try to shorten the name if possible
##########
File path:
pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
##########
@@ -67,6 +67,12 @@
// Percentage of segments we failed to get size for
TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent",
false),
+ // Number of errors during segment store upload retry of LLC segment
+
NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY("LLCSegmentDeepStoreUploadRetryError",
false),
Review comment:
Why gauge? These two should be meters.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1287,147 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ // Pre-fetch the LLC segments without deep store copy.
+ public void prefetchLLCSegmentsWithoutDeepStoreCopy(String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ return;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}",
tableNameWithType);
+ return;
+ }
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ return;
+ }
+
+ long currentTimeMs = getCurrentTimeMs();
+ List<String> segmentNames =
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+ for (String segmentName : segmentNames) {
+ try {
+ if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+ continue;
+ }
+
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+ // Cache the committed LLC segments without segment store download
url
+ if (segmentZKMetadata.getStatus() == Status.DONE &&
+
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+ }
+ } catch (Exception e) {
+ _controllerMetrics.addValueToTableGauge(tableNameWithType,
+
ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L);
+ LOGGER.error("Failed to fetch the LLC segment {} ZK metadata",
segmentName);
+ }
+ }
+ }
+
+ /**
+ * Only validate recently created LLC segment for missing deep store
download url.
+ * The time range check is based on segment name. This step helps to
alleviate ZK access.
+ */
+ private boolean isLLCSegmentWithinValidationRange(String segmentName, long
currentTimeMs) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ long creationTimeMs = llcSegmentName.getCreationTimeMs();
+ return currentTimeMs - creationTimeMs <
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+ }
+
+ /**
+ * Fix the missing LLC segment in deep store by asking servers to upload,
and add segment store download uri in ZK.
+ * Since uploading to segment store involves expensive compression step
(first tar up the segment and then upload), we don't want to retry the
uploading.
+ * Segment without segment store copy can still be downloaded from peer
servers.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing
deep-store requirement for Realtime segment completion:Failure cases and
handling</a>
+ */
+ public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+ String realtimeTableName = tableConfig.getTableName();
+ if (_isStopping) {
+ LOGGER.info("Skipped fixing segment store copy of LLC segments for table
{}, because segment manager is stopping.", realtimeTableName);
Review comment:
If there is nothing to fix, then this is a wrong message. An operator
reading this message will think that there is something to be fixed for the
table. Move the logic to below line 1358 once you have ascertained that there
is something to fix for the table.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1287,147 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ // Pre-fetch the LLC segments without deep store copy.
+ public void prefetchLLCSegmentsWithoutDeepStoreCopy(String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ return;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}",
tableNameWithType);
+ return;
+ }
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ return;
+ }
+
+ long currentTimeMs = getCurrentTimeMs();
+ List<String> segmentNames =
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+ for (String segmentName : segmentNames) {
+ try {
+ if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+ continue;
+ }
+
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+ // Cache the committed LLC segments without segment store download
url
+ if (segmentZKMetadata.getStatus() == Status.DONE &&
+
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+ }
+ } catch (Exception e) {
+ _controllerMetrics.addValueToTableGauge(tableNameWithType,
+
ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L);
+ LOGGER.error("Failed to fetch the LLC segment {} ZK metadata",
segmentName);
+ }
+ }
+ }
+
+ /**
+ * Only validate recently created LLC segment for missing deep store
download url.
+ * The time range check is based on segment name. This step helps to
alleviate ZK access.
+ */
+ private boolean isLLCSegmentWithinValidationRange(String segmentName, long
currentTimeMs) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ long creationTimeMs = llcSegmentName.getCreationTimeMs();
+ return currentTimeMs - creationTimeMs <
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+ }
+
+ /**
+ * Fix the missing LLC segment in deep store by asking servers to upload,
and add segment store download uri in ZK.
+ * Since uploading to segment store involves expensive compression step
(first tar up the segment and then upload), we don't want to retry the
uploading.
+ * Segment without segment store copy can still be downloaded from peer
servers.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing
deep-store requirement for Realtime segment completion:Failure cases and
handling</a>
+ */
+ public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+ String realtimeTableName = tableConfig.getTableName();
+ if (_isStopping) {
+ LOGGER.info("Skipped fixing segment store copy of LLC segments for table
{}, because segment manager is stopping.", realtimeTableName);
+ return;
+ }
+
+ Queue<String> segmentQueue =
_llcSegmentMapForUpload.get(realtimeTableName);
+ if (segmentQueue == null || segmentQueue.isEmpty()) {
+ return;
+ }
+
+ // Store the segments to be fixed again in the case of fix failure, or
skip in this round
+ Queue<String> segmentsNotFixed = new LinkedList<>();
+ RetentionStrategy retentionStrategy = new TimeRetentionStrategy(
+
TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase()),
+
Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
+
+ // Iterate through LLC segments and upload missing segment store copy by
following steps:
+ // 1. Ask servers which have online segment replica to upload to segment
store. Servers return segment store download url after successful uploading.
+ // 2. Update the LLC segment ZK metadata by adding segment store download
url.
+ while (!segmentQueue.isEmpty()) {
+ String segmentName = segmentQueue.poll();
+ // Check if it's null in case of the while condition doesn't stand true
anymore in the step of dequeue. Dequeue returns null if queue is empty.
+ if (segmentName == null) {
+ break;
+ }
+
+ try {
+ Stat stat = new Stat();
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+ // if the download url is already fixed, skip the fix for this segment.
+ if
(!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ LOGGER.info("Skipped fixing LLC segment {} whose deep store download
url is already available", segmentName);
+ continue;
+ }
+ // skip the fix for the segment if it is already out of retention.
+ if (retentionStrategy.isPurgeable(realtimeTableName,
segmentZKMetadata)) {
Review comment:
What happens if the retention is a short time away, and the retention
manager walks in and starts to remove the segment while we are trying to upload
it? I think we have a race condition here that we need to think about.
##########
File path:
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -727,6 +727,23 @@ public SimpleHttpResponse uploadSegment(URI uri, String
segmentName, InputStream
return uploadSegment(uri, segmentName, inputStream, null, parameters,
DEFAULT_SOCKET_TIMEOUT_MS);
}
+ /**
+ * Controller periodic task uses this endpoint to ask servers to upload
committed llc segment to segment store if missing.
+ * @param uri The uri to ask servers to upload segment to segment store
+ * @return the uploaded segment download url from segment store
+ * @throws URISyntaxException
+ * @throws IOException
+ * @throws HttpErrorStatusException
+ */
+ public String uploadToSegmentStore(String uri)
Review comment:
How was this resolved?
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1287,147 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ // Pre-fetch the LLC segments without deep store copy.
+ public void prefetchLLCSegmentsWithoutDeepStoreCopy(String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ return;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}",
tableNameWithType);
+ return;
+ }
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ return;
+ }
+
+ long currentTimeMs = getCurrentTimeMs();
+ List<String> segmentNames =
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+ for (String segmentName : segmentNames) {
+ try {
+ if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+ continue;
+ }
+
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+ // Cache the committed LLC segments without segment store download
url
+ if (segmentZKMetadata.getStatus() == Status.DONE &&
+
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+ }
+ } catch (Exception e) {
+ _controllerMetrics.addValueToTableGauge(tableNameWithType,
+
ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L);
+ LOGGER.error("Failed to fetch the LLC segment {} ZK metadata",
segmentName);
+ }
+ }
+ }
+
+ /**
+ * Only validate recently created LLC segment for missing deep store
download url.
+ * The time range check is based on segment name. This step helps to
alleviate ZK access.
+ */
+ private boolean isLLCSegmentWithinValidationRange(String segmentName, long
currentTimeMs) {
Review comment:
remove the method and fold the logic into the place where it is called.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -135,9 +158,20 @@
private final Lock[] _idealStateUpdateLocks;
private final TableConfigCache _tableConfigCache;
private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+ private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled;
private volatile boolean _isStopping = false;
private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+ private FileUploadDownloadClient _fileUploadDownloadClient;
+ /**
+ * Map caching the LLC segment names without deep store download uri.
+ * Controller gets the LLC segment names from this map, and asks servers to
upload the segments to segment store.
Review comment:
```suggestion
* Controller gets the LLC segment names from this map, and asks servers
to upload the segments to deep store.
```
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -122,6 +137,14 @@
* The segment will be eligible for repairs by the validation manager, if
the time exceeds this value
*/
private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; //
5 MINUTES
+ /**
+ * Controller waits this amount of time before asking servers to upload LLC
segments without deep store copy.
+ * The reason is after step 1 of segment completion is done (segment ZK
metadata status changed to be DONE),
+ * servers may be still in the process of loading segments.
Review comment:
```suggestion
* servers may be still in the process of transitioning segments from
CONSUMING to ONLINE states
```
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -135,9 +158,20 @@
private final Lock[] _idealStateUpdateLocks;
private final TableConfigCache _tableConfigCache;
private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+ private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled;
private volatile boolean _isStopping = false;
private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+ private FileUploadDownloadClient _fileUploadDownloadClient;
+ /**
+ * Map caching the LLC segment names without deep store download uri.
+ * Controller gets the LLC segment names from this map, and asks servers to
upload the segments to segment store.
+ * This helps to alleviates excessive ZK access when fetching LLC segment
list.
+ * Key: table name; Value: LLC segment names to be uploaded to segment store.
Review comment:
```suggestion
* Key: table name; Value: LLC segment names to be uploaded to deep store.
```
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
##########
@@ -121,6 +121,12 @@
public static final String SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS =
"controller.segmentRelocator.initialDelayInSeconds";
+ // configs for uploading missing LLC segments copy to segment store
+ public static final String
ENABLE_UPLOAD_MISSING_LLC_SEGMENT_TO_SEGMENT_STORE =
Review comment:
nit: ENABLE_DEEP_STORE_LLC_SEGMENT_CHECK ? I don't know, I am just
thinking of some shorter name here.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -135,9 +158,20 @@
private final Lock[] _idealStateUpdateLocks;
private final TableConfigCache _tableConfigCache;
private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+ private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled;
private volatile boolean _isStopping = false;
private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+ private FileUploadDownloadClient _fileUploadDownloadClient;
+ /**
+ * Map caching the LLC segment names without deep store download uri.
+ * Controller gets the LLC segment names from this map, and asks servers to
upload the segments to segment store.
+ * This helps to alleviates excessive ZK access when fetching LLC segment
list.
+ * Key: table name; Value: LLC segment names to be uploaded to segment store.
+ */
+ private Map<String, Queue<String>> _llcSegmentMapForUpload;
Review comment:
Why is the value a `Queue`? Is there a notion of ordering here?
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -122,6 +137,14 @@
* The segment will be eligible for repairs by the validation manager, if
the time exceeds this value
*/
private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000L; //
5 MINUTES
+ /**
+ * Controller waits this amount of time before asking servers to upload LLC
segments without deep store copy.
Review comment:
```suggestion
* Controller waits this amount of time before asking servers to upload
LLC segments missing in deep store.
```
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1287,147 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ // Pre-fetch the LLC segments without deep store copy.
+ public void prefetchLLCSegmentsWithoutDeepStoreCopy(String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ return;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}",
tableNameWithType);
+ return;
+ }
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ return;
+ }
+
+ long currentTimeMs = getCurrentTimeMs();
+ List<String> segmentNames =
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+ for (String segmentName : segmentNames) {
+ try {
+ if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+ continue;
+ }
+
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+ // Cache the committed LLC segments without segment store download
url
+ if (segmentZKMetadata.getStatus() == Status.DONE &&
+
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+ }
+ } catch (Exception e) {
+ _controllerMetrics.addValueToTableGauge(tableNameWithType,
+
ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L);
+ LOGGER.error("Failed to fetch the LLC segment {} ZK metadata",
segmentName);
+ }
+ }
+ }
+
+ /**
+ * Only validate recently created LLC segment for missing deep store
download url.
+ * The time range check is based on segment name. This step helps to
alleviate ZK access.
+ */
+ private boolean isLLCSegmentWithinValidationRange(String segmentName, long
currentTimeMs) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ long creationTimeMs = llcSegmentName.getCreationTimeMs();
+ return currentTimeMs - creationTimeMs <
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+ }
+
+ /**
+ * Fix the missing LLC segment in deep store by asking servers to upload,
and add segment store download uri in ZK.
+ * Since uploading to segment store involves expensive compression step
(first tar up the segment and then upload), we don't want to retry the
uploading.
+ * Segment without segment store copy can still be downloaded from peer
servers.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing
deep-store requirement for Realtime segment completion:Failure cases and
handling</a>
+ */
+ public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+ String realtimeTableName = tableConfig.getTableName();
+ if (_isStopping) {
+ LOGGER.info("Skipped fixing segment store copy of LLC segments for table
{}, because segment manager is stopping.", realtimeTableName);
+ return;
+ }
+
+ Queue<String> segmentQueue =
_llcSegmentMapForUpload.get(realtimeTableName);
+ if (segmentQueue == null || segmentQueue.isEmpty()) {
+ return;
+ }
+
+ // Store the segments to be fixed again in the case of fix failure, or
skip in this round
+ Queue<String> segmentsNotFixed = new LinkedList<>();
+ RetentionStrategy retentionStrategy = new TimeRetentionStrategy(
+
TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase()),
+
Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
+
+ // Iterate through LLC segments and upload missing segment store copy by
following steps:
+ // 1. Ask servers which have online segment replica to upload to segment
store. Servers return segment store download url after successful uploading.
+ // 2. Update the LLC segment ZK metadata by adding segment store download
url.
+ while (!segmentQueue.isEmpty()) {
+ String segmentName = segmentQueue.poll();
+ // Check if it's null in case of the while condition doesn't stand true
anymore in the step of dequeue. Dequeue returns null if queue is empty.
+ if (segmentName == null) {
Review comment:
I don't understand this null check. We already checked for queue empty
above. Unless there are other threads modifying the queue (in which case, we
need some precise synchronization rather than an if check everywhere), why
check for null again?
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -135,9 +158,20 @@
private final Lock[] _idealStateUpdateLocks;
private final TableConfigCache _tableConfigCache;
private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+ private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled;
private volatile boolean _isStopping = false;
private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+ private FileUploadDownloadClient _fileUploadDownloadClient;
+ /**
+ * Map caching the LLC segment names without deep store download uri.
Review comment:
```suggestion
* Map caching the LLC segment names that are missing deep store download
uri in segment metadata.
```
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1287,147 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ // Pre-fetch the LLC segments without deep store copy.
+ public void prefetchLLCSegmentsWithoutDeepStoreCopy(String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ return;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}",
tableNameWithType);
+ return;
+ }
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ return;
+ }
+
+ long currentTimeMs = getCurrentTimeMs();
+ List<String> segmentNames =
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+ for (String segmentName : segmentNames) {
+ try {
+ if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
+ continue;
+ }
+
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+ // Cache the committed LLC segments without segment store download
url
+ if (segmentZKMetadata.getStatus() == Status.DONE &&
+
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
+ }
+ } catch (Exception e) {
+ _controllerMetrics.addValueToTableGauge(tableNameWithType,
+
ControllerGauge.NUMBER_OF_ERRORS_FOR_LLC_SEGMENTS_ZK_METADATA_PREFETCH, 1L);
+ LOGGER.error("Failed to fetch the LLC segment {} ZK metadata",
segmentName);
+ }
+ }
+ }
+
+ /**
+ * Only validate recently created LLC segment for missing deep store
download url.
+ * The time range check is based on segment name. This step helps to
alleviate ZK access.
+ */
+ private boolean isLLCSegmentWithinValidationRange(String segmentName, long
currentTimeMs) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ long creationTimeMs = llcSegmentName.getCreationTimeMs();
+ return currentTimeMs - creationTimeMs <
_validationRangeForLLCSegmentsDeepStoreCopyMs;
+ }
+
+ /**
+ * Fix the missing LLC segment in deep store by asking servers to upload,
and add segment store download uri in ZK.
+ * Since uploading to segment store involves expensive compression step
(first tar up the segment and then upload), we don't want to retry the
uploading.
+ * Segment without segment store copy can still be downloaded from peer
servers.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing
deep-store requirement for Realtime segment completion:Failure cases and
handling</a>
+ */
+ public void uploadToSegmentStoreIfMissing(TableConfig tableConfig) {
+ String realtimeTableName = tableConfig.getTableName();
+ if (_isStopping) {
+ LOGGER.info("Skipped fixing segment store copy of LLC segments for table
{}, because segment manager is stopping.", realtimeTableName);
+ return;
+ }
+
+ Queue<String> segmentQueue =
_llcSegmentMapForUpload.get(realtimeTableName);
+ if (segmentQueue == null || segmentQueue.isEmpty()) {
+ return;
+ }
+
+ // Store the segments to be fixed again in the case of fix failure, or
skip in this round
+ Queue<String> segmentsNotFixed = new LinkedList<>();
+ RetentionStrategy retentionStrategy = new TimeRetentionStrategy(
+
TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase()),
+
Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
+
+ // Iterate through LLC segments and upload missing segment store copy by
following steps:
+ // 1. Ask servers which have online segment replica to upload to segment
store. Servers return segment store download url after successful uploading.
+ // 2. Update the LLC segment ZK metadata by adding segment store download
url.
+ while (!segmentQueue.isEmpty()) {
+ String segmentName = segmentQueue.poll();
+ // Check if it's null in case of the while condition doesn't stand true
anymore in the step of dequeue. Dequeue returns null if queue is empty.
+ if (segmentName == null) {
+ break;
+ }
+
+ try {
+ Stat stat = new Stat();
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+ // if the download url is already fixed, skip the fix for this segment.
Review comment:
Can you elaborate on how this can happen?
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -135,9 +158,20 @@
private final Lock[] _idealStateUpdateLocks;
private final TableConfigCache _tableConfigCache;
private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+ private final boolean _isUploadingRealtimeMissingSegmentStoreCopyEnabled;
private volatile boolean _isStopping = false;
private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+ private FileUploadDownloadClient _fileUploadDownloadClient;
+ /**
+ * Map caching the LLC segment names without deep store download uri.
+ * Controller gets the LLC segment names from this map, and asks servers to
upload the segments to segment store.
+ * This helps to alleviates excessive ZK access when fetching LLC segment
list.
Review comment:
```suggestion
* A cache helps to alleviate excessive ZK access when fetching LLC
segment list.
```
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
##########
@@ -67,6 +67,22 @@ public RealtimeSegmentValidationManager(ControllerConf
config, PinotHelixResourc
Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
}
+ @Override
+ protected void setUpTask() {
+ // Prefetch the LLC segment without segment store copy from ZK, which
helps to alleviate ZK access.
Review comment:
Where is the TODO? Please add it here, otherwise someone going through
the code will have the same question (worse, may make the same assumption in
other places). Make it clear that this may not work all the time and it is a
TODO to take action when leadership is established.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1214,4 +1276,130 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ // Pre-fetch the LLC segments without deep store copy.
+ public void prefetchLLCSegmentsWithoutDeepStoreCopy(String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ return;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}",
tableNameWithType);
+ return;
+ }
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ return;
+ }
+
+ long currentTimeMs = getCurrentTimeMs();
+ List<String> segmentNames =
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+ for (String segmentName : segmentNames) {
+ try {
+ if (!isLLCSegmentWithinValidationRange(segmentName, currentTimeMs)) {
Review comment:
This check does not belong here. It belongs in the place where you
decide to ask the server to upload to deepstore. Please move it. Also, we don't
need a method for this. it is 2 lines long. Fold it in.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]