9aman commented on code in PR #15316:
URL: https://github.com/apache/pinot/pull/15316#discussion_r2007034972
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -564,77 +562,59 @@ private void doAddConsumingSegment(String segmentName)
@Override
public File downloadSegment(SegmentZKMetadata zkMetadata)
throws Exception {
- Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS,
- "Segment: %s is still IN_PROGRESS and cannot be downloaded",
zkMetadata.getSegmentName());
+ String segmentName = zkMetadata.getSegmentName();
+ Status status = zkMetadata.getStatus();
+ Preconditions.checkState(status != Status.IN_PROGRESS, "Segment: %s is
still IN_PROGRESS and cannot be downloaded",
+ segmentName);
- // Case: The commit protocol has completed, and the segment is ready to be
downloaded either
- // from deep storage or from a peer (if peer-to-peer download is enabled).
- if (zkMetadata.getStatus() == Status.DONE) {
+ // The commit protocol has completed, and the segment is ready to be
downloaded either from deep storage or from a
+ // peer (if peer-to-peer download is enabled).
+ if (status.isCompleted()) {
return super.downloadSegment(zkMetadata);
}
// The segment status is COMMITTING, indicating that the segment commit
process is incomplete.
// Attempting a waited download within the configured time limit.
- long downloadTimeoutMilliseconds =
-
getDownloadTimeOutMilliseconds(ZKMetadataProvider.getTableConfig(_propertyStore,
_tableNameWithType));
- final long startTime = System.currentTimeMillis();
- List<URI> onlineServerURIs;
- while (System.currentTimeMillis() - startTime <
downloadTimeoutMilliseconds) {
+ Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s
for segment: %s", status, segmentName);
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
+ Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", _tableNameWithType);
+ long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig);
+ long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs;
+ while (System.currentTimeMillis() < deadlineMs) {
// ZK Metadata may change during segment download process; fetch it on
every retry.
- zkMetadata = fetchZKMetadata(zkMetadata.getSegmentName());
-
- if (zkMetadata.getDownloadUrl() != null) {
- // The downloadSegment() will throw an exception in case there are
some genuine issues.
- // We don't want to retry in those scenarios and will throw an
exception
- return downloadSegmentFromDeepStore(zkMetadata);
- }
-
- if (_peerDownloadScheme != null) {
- _logger.info("Peer download is enabled for the segment: {}",
zkMetadata.getSegmentName());
- try {
- onlineServerURIs = new ArrayList<>();
-
PeerServerSegmentFinder.getOnlineServersFromExternalView(_helixManager.getClusterManagmentTool(),
- _helixManager.getClusterName(), _tableNameWithType,
zkMetadata.getSegmentName(), _peerDownloadScheme,
- onlineServerURIs);
- if (!onlineServerURIs.isEmpty()) {
- return downloadSegmentFromPeers(zkMetadata);
- }
- } catch (Exception e) {
- _logger.warn("Could not download segment: {} from peer",
zkMetadata.getSegmentName(), e);
- }
+ zkMetadata = fetchZKMetadata(segmentName);
+ if (zkMetadata.getStatus().isCompleted()) {
Review Comment:
What would happen when the `commitEndMetadata` call to update the ZK
metadata with the download url, etc fails ?
The segment will be stuck in the `COMMITTING` state and might also be
available on the lead server for peer download. We will not enter there as the
`zkMetadata.getStatus().isCompleted()` will always be `false`.
These situation of missing download url's is fixed during `segment level
validations` done in RealtimeSegmentValidationManager. The frequency of these
runs is kept lower as it scans all the segments ZK metadata.
If we remove this check the `downloadSegment` will fail as
```
Preconditions.checkState(downloadUrl != null,
"Failed to find download URL in ZK metadata for segment: %s of
table: %s", segmentName, _tableNameWithType);
```
Normal ingestion ensures that we either have a proper download url or an
empty string in case upload fails and we have peer download enabled.
This is the reason for writing a different `downloadSegment` instead of
using the parent class one's. It felt that I will end up changing a lot of
things for the OFFLINE tables as well that use the base class download function.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -564,77 +562,59 @@ private void doAddConsumingSegment(String segmentName)
@Override
public File downloadSegment(SegmentZKMetadata zkMetadata)
throws Exception {
- Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS,
- "Segment: %s is still IN_PROGRESS and cannot be downloaded",
zkMetadata.getSegmentName());
+ String segmentName = zkMetadata.getSegmentName();
+ Status status = zkMetadata.getStatus();
+ Preconditions.checkState(status != Status.IN_PROGRESS, "Segment: %s is
still IN_PROGRESS and cannot be downloaded",
+ segmentName);
- // Case: The commit protocol has completed, and the segment is ready to be
downloaded either
- // from deep storage or from a peer (if peer-to-peer download is enabled).
- if (zkMetadata.getStatus() == Status.DONE) {
+ // The commit protocol has completed, and the segment is ready to be
downloaded either from deep storage or from a
+ // peer (if peer-to-peer download is enabled).
+ if (status.isCompleted()) {
return super.downloadSegment(zkMetadata);
}
// The segment status is COMMITTING, indicating that the segment commit
process is incomplete.
// Attempting a waited download within the configured time limit.
- long downloadTimeoutMilliseconds =
-
getDownloadTimeOutMilliseconds(ZKMetadataProvider.getTableConfig(_propertyStore,
_tableNameWithType));
- final long startTime = System.currentTimeMillis();
- List<URI> onlineServerURIs;
- while (System.currentTimeMillis() - startTime <
downloadTimeoutMilliseconds) {
+ Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s
for segment: %s", status, segmentName);
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
+ Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", _tableNameWithType);
+ long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig);
+ long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs;
+ while (System.currentTimeMillis() < deadlineMs) {
// ZK Metadata may change during segment download process; fetch it on
every retry.
- zkMetadata = fetchZKMetadata(zkMetadata.getSegmentName());
-
- if (zkMetadata.getDownloadUrl() != null) {
- // The downloadSegment() will throw an exception in case there are
some genuine issues.
- // We don't want to retry in those scenarios and will throw an
exception
- return downloadSegmentFromDeepStore(zkMetadata);
- }
-
- if (_peerDownloadScheme != null) {
- _logger.info("Peer download is enabled for the segment: {}",
zkMetadata.getSegmentName());
- try {
- onlineServerURIs = new ArrayList<>();
-
PeerServerSegmentFinder.getOnlineServersFromExternalView(_helixManager.getClusterManagmentTool(),
- _helixManager.getClusterName(), _tableNameWithType,
zkMetadata.getSegmentName(), _peerDownloadScheme,
- onlineServerURIs);
- if (!onlineServerURIs.isEmpty()) {
- return downloadSegmentFromPeers(zkMetadata);
- }
- } catch (Exception e) {
- _logger.warn("Could not download segment: {} from peer",
zkMetadata.getSegmentName(), e);
- }
+ zkMetadata = fetchZKMetadata(segmentName);
+ if (zkMetadata.getStatus().isCompleted()) {
Review Comment:
cc: @KKcorps
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]