mcvsubbu commented on a change in pull request #5336: URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r448097186
########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java ########## @@ -268,20 +276,82 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading } public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata, - IndexLoadingConfig indexLoadingConfig) { + IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) { final String uri = llcSegmentMetadata.getDownloadUrl(); + if (uri != null && !uri.isEmpty()) { + try { + downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri); + } catch (Exception e) { + // Download from deep store failed; try to download from peer if peer download is setup for the table. Review comment: add a warn log here with segment name and exception. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java ########## @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.util; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.collections.ListUtils; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixManager; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.StringUtil; +import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * PeerServerSegmentFinder discovers all the servers having the input segment in a ONLINE state through external view of + * a Pinot table. + */ +public class PeerServerSegmentFinder { + private static final Logger _logger = LoggerFactory.getLogger(PeerServerSegmentFinder.class); + /** + * + * @param segmentName + * @param downloadScheme Can be either http or https. + * @param helixManager + * @return a list of uri strings of the form http(s)://hostname:port/segments/tablenameWithType/segmentName + * for the servers hosting ONLINE segments; empty list if no such server found. + */ + public static List<URI> getPeerServerURIs(String segmentName, String downloadScheme, HelixManager helixManager) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + String tableNameWithType = + TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(llcSegmentName.getTableName()); + + HelixAdmin helixAdmin = helixManager.getClusterManagmentTool(); + String clusterName = helixManager.getClusterName(); + if (clusterName == null) { + _logger.error("ClusterName not found"); + return ListUtils.EMPTY_LIST; + } + ExternalView externalViewForResource = + HelixHelper.getExternalViewForResource(helixAdmin, clusterName, tableNameWithType); + if (externalViewForResource == null) { + _logger.warn("External View not found for table {}", tableNameWithType); + return ListUtils.EMPTY_LIST; + } + List<URI> onlineServerURIs = new ArrayList<>(); + // Find out the ONLINE server serving the segment. + for (String segment : externalViewForResource.getPartitionSet()) { Review comment: Why are we looping around here? Why not just get the statemap for a segment? `ExternalView.getStateMap(segmentName)` returns a Map<String, String> that we can iterate on. ########## File path: pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java ########## @@ -75,6 +77,16 @@ public void fetchSegmentToLocal(URI uri, File dest) }); } + @Override + public void fetchSegmentToLocal(List<URI> uris, File dest) Review comment: Nice. How about changing this to apply retries with backoff but use one of the uris at random ? Sort of similar to lines 68-77 ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java ########## @@ -268,20 +276,82 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading } public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata, - IndexLoadingConfig indexLoadingConfig) { + IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) { final String uri = llcSegmentMetadata.getDownloadUrl(); + if (uri != null && !uri.isEmpty()) { + try { + downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri); + } catch (Exception e) { + // Download from deep store failed; try to download from peer if peer download is setup for the table. + if (isPeerSegmentDownloadEnabled(tableConfig)) { + downloadSegmentFromPeer(segmentName, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig); Review comment: need another try/catch here for exceptions, and a similar log if that fails. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java ########## @@ -268,20 +273,81 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading } public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata, - IndexLoadingConfig indexLoadingConfig) { + IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) { final String uri = llcSegmentMetadata.getDownloadUrl(); + if (!"PEER".equalsIgnoreCase(uri)) { + try { + downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri); + } catch (Exception e) { + // Download from deep store failed; try to download from peer if peer download is setup for the table. + if (isPeerSegmentDownloadEnabled(tableConfig)) { + downloadSegmentFromPeer(segmentName, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig); + } else { + throw e; + } + } + } else { + if (isPeerSegmentDownloadEnabled(tableConfig)) { + downloadSegmentFromPeer(segmentName, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig); + } else { + throw new RuntimeException("Peer segment download not enabled for segment " + segmentName); + } + } + } + + private void downloadSegmentFromDeepStore(String segmentName, IndexLoadingConfig indexLoadingConfig, String uri) { File tempSegmentFolder = new File(_indexDir, "tmp-" + segmentName + "." + System.currentTimeMillis()); File tempFile = new File(_indexDir, segmentName + ".tar.gz"); - final File segmentFolder = new File(_indexDir, segmentName); - FileUtils.deleteQuietly(segmentFolder); try { SegmentFetcherFactory.fetchSegmentToLocal(uri, tempFile); _logger.info("Downloaded file from {} to {}; Length of downloaded file: {}", uri, tempFile, tempFile.length()); - TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder); - _logger.info("Uncompressed file {} into tmp dir {}", tempFile, tempSegmentFolder); - FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder); - _logger.info("Replacing LLC Segment {}", segmentName); - replaceLLSegment(segmentName, indexLoadingConfig); + untarAndMoveSegment(segmentName, indexLoadingConfig, tempSegmentFolder, tempFile); + } catch (Exception e) { + _logger.warn("Failed to download segment {} from deep store: ", segmentName, e); + throw new RuntimeException(e); + } finally { + FileUtils.deleteQuietly(tempFile); + FileUtils.deleteQuietly(tempSegmentFolder); + } + } + + private void untarAndMoveSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, File tempSegmentFolder, + File tempFile) + throws IOException, ArchiveException { + TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder); + _logger.info("Uncompressed file {} into tmp dir {}", tempFile, tempSegmentFolder); + final File segmentFolder = new File(_indexDir, segmentName); + FileUtils.deleteQuietly(segmentFolder); + FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder); + _logger.info("Replacing LLC Segment {}", segmentName); + replaceLLSegment(segmentName, indexLoadingConfig); + } + + private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) { + return SegmentFetcherFactory.HTTP_PROTOCOL + .equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()) + || SegmentFetcherFactory.HTTPS_PROTOCOL + .equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()); + } + + private void downloadSegmentFromPeer(String segmentName, String downloadScheme, IndexLoadingConfig indexLoadingConfig) { + File tempSegmentFolder = new File(_indexDir, "tmp-" + segmentName + "." + System.currentTimeMillis()); + File tempFile = new File(_indexDir, segmentName + ".tar.gz"); + try { + RetryPolicies.exponentialBackoffRetryPolicy(RETRY_COUNT, RETRY_WAIT_MS, RETRY_DELAY_SCALE_FACTOR).attempt(() -> { Review comment: Move this retry logic to the BaseSegmentFetcher like I suggest there. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org