This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 61d6b1075d Minor Pinot Deepstore Upload Retry Task Improvements (#10752) 61d6b1075d is described below commit 61d6b1075da2ae73d817c3154f942af2d45021b6 Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Thu May 18 05:50:30 2023 +0530 Minor Pinot Deepstore Upload Retry Task Improvements (#10752) * Use Non-Random Path in Pinot Deepstore Upload Retry * Add configurable timeout + fix segment name in move * Fix test * Proper test which asserts segment file move * Address feedback --- .../pinot/common/utils/SimpleHttpErrorInfo.java | 2 ++ .../apache/pinot/controller/ControllerConf.java | 6 ++++ .../realtime/PinotLLCRealtimeSegmentManager.java | 34 ++++++++++++++++++---- .../PinotLLCRealtimeSegmentManagerTest.java | 31 +++++++++++++++----- .../manager/realtime/PinotFSSegmentUploader.java | 8 ++++- .../data/manager/realtime/SegmentUploader.java | 11 ++++++- .../realtime/Server2ControllerSegmentUploader.java | 13 +++++++-- .../pinot/server/api/resources/TablesResource.java | 20 +++++++++++-- 8 files changed, 105 insertions(+), 20 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/SimpleHttpErrorInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/SimpleHttpErrorInfo.java index 90597453d5..b424ada19f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SimpleHttpErrorInfo.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SimpleHttpErrorInfo.java @@ -19,6 +19,7 @@ package org.apache.pinot.common.utils; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; @@ -30,6 +31,7 @@ public class SimpleHttpErrorInfo { private String _error; @JsonCreator + @JsonIgnoreProperties(ignoreUnknown = true) public SimpleHttpErrorInfo(@JsonProperty("code") int code, @JsonProperty("error") String message) { _code = code; _error = message; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index 2dda29a5b2..8595a434af 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -213,6 +213,8 @@ public class ControllerConf extends PinotConfiguration { // Default value is false. public static final String ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT = "controller.realtime.segment.deepStoreUploadRetryEnabled"; + public static final String DEEP_STORE_RETRY_UPLOAD_TIMEOUT_MS = + "controller.realtime.segment.deepStoreUploadRetry.timeoutMs"; public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120; public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300; @@ -920,6 +922,10 @@ public class ControllerConf extends PinotConfiguration { return getProperty(ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, false); } + public int getDeepStoreRetryUploadTimeoutMs() { + return getProperty(ControllerPeriodicTasksConf.DEEP_STORE_RETRY_UPLOAD_TIMEOUT_MS, -1); + } + public long getPinotTaskManagerInitialDelaySeconds() { return getPeriodicTaskInitialDelayInSeconds(); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 455ff15894..fab1a234b0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -167,6 +167,7 @@ public class PinotLLCRealtimeSegmentManager { private final Lock[] _idealStateUpdateLocks; private final FlushThresholdUpdateManager _flushThresholdUpdateManager; private final boolean _isDeepStoreLLCSegmentUploadRetryEnabled; + private final int _deepstoreUploadRetryTimeoutMs; private final FileUploadDownloadClient _fileUploadDownloadClient; private final AtomicInteger _numCompletingSegments = new AtomicInteger(0); @@ -191,6 +192,7 @@ public class PinotLLCRealtimeSegmentManager { } _flushThresholdUpdateManager = new FlushThresholdUpdateManager(); _isDeepStoreLLCSegmentUploadRetryEnabled = controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled(); + _deepstoreUploadRetryTimeoutMs = controllerConf.getDeepStoreRetryUploadTimeoutMs(); _fileUploadDownloadClient = _isDeepStoreLLCSegmentUploadRetryEnabled ? initFileUploadDownloadClient() : null; } @@ -481,12 +483,10 @@ public class PinotLLCRealtimeSegmentManager { LOGGER.info("No moving needed for segment on peer servers: {}", segmentLocation); return; } - URI segmentFileURI = URIUtils.getUri(segmentLocation); + URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), rawTableName); - URI uriToMoveTo = URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme()); - Preconditions.checkState(pinotFS.move(segmentFileURI, uriToMoveTo, true), - "Failed to move segment file for segment: %s from: %s to: %s", segmentName, segmentLocation, uriToMoveTo); + String uriToMoveTo = moveSegmentFile(rawTableName, segmentName, segmentLocation, pinotFS); // Cleans up tmp segment files under table dir. // We only clean up tmp segment files in table level dir, so there's no need to list recursively. @@ -502,7 +502,7 @@ public class PinotLLCRealtimeSegmentManager { } catch (Exception e) { LOGGER.warn("Caught exception while deleting temporary segment files for segment: {}", segmentName, e); } - committingSegmentDescriptor.setSegmentLocation(uriToMoveTo.toString()); + committingSegmentDescriptor.setSegmentLocation(uriToMoveTo); } /** @@ -1392,6 +1392,7 @@ public class PinotLLCRealtimeSegmentManager { Preconditions.checkState(!_isStopping, "Segment manager is stopping"); String realtimeTableName = tableConfig.getTableName(); + String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); // Use this retention value to avoid the data racing between segment upload and retention management. RetentionStrategy retentionStrategy = null; @@ -1404,6 +1405,8 @@ public class PinotLLCRealtimeSegmentManager { retentionMs - MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS); } + PinotFS pinotFS = PinotFSFactory.create(URIUtils.getUri(_controllerConf.getDataDir()).getScheme()); + // Iterate through LLC segments and upload missing deep store copy by following steps: // 1. Ask servers which have online segment replica to upload to deep store. // Servers return deep store download url after successful uploading. @@ -1438,9 +1441,13 @@ public class PinotLLCRealtimeSegmentManager { // Randomly ask one server to upload URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size())); String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload"); + serverUploadRequestUrl = + String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs); LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName, serverUploadRequestUrl); - String segmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl); + String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl); + String segmentDownloadUrl = + moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS); LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl); // Update segment ZK metadata by adding the download URL segmentZKMetadata.setDownloadUrl(segmentDownloadUrl); @@ -1555,4 +1562,19 @@ public class PinotLLCRealtimeSegmentManager { Set<String> consumingSegments = findConsumingSegments(idealState); return new PauseStatus(Boolean.parseBoolean(isTablePausedStr), consumingSegments, null); } + + @VisibleForTesting + String moveSegmentFile(String rawTableName, String segmentName, String segmentLocation, PinotFS pinotFS) + throws IOException { + URI segmentFileURI = URIUtils.getUri(segmentLocation); + URI uriToMoveTo = createSegmentPath(rawTableName, segmentName); + Preconditions.checkState(pinotFS.move(segmentFileURI, uriToMoveTo, true), + "Failed to move segment file for segment: %s from: %s to: %s", segmentName, segmentLocation, uriToMoveTo); + return uriToMoveTo.toString(); + } + + @VisibleForTesting + URI createSegmentPath(String rawTableName, String segmentName) { + return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); + } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index eee1f2a8a8..2223c9e341 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -89,7 +90,12 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; public class PinotLLCRealtimeSegmentManagerTest { @@ -948,6 +954,7 @@ public class PinotLLCRealtimeSegmentManagerTest { ControllerConf controllerConfig = new ControllerConf(); controllerConfig.setProperty( ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, true); + controllerConfig.setDataDir(TEMP_DIR.toString()); FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(pinotHelixResourceManager, controllerConfig); Assert.assertTrue(segmentManager.isDeepStoreLLCSegmentUploadRetryEnabled()); @@ -990,11 +997,17 @@ public class PinotLLCRealtimeSegmentManagerTest { "segments", REALTIME_TABLE_NAME, segmentsZKMetadata.get(0).getSegmentName(), - "upload"); - String segmentDownloadUrl0 = String.format("segmentDownloadUr_%s", segmentsZKMetadata.get(0) - .getSegmentName()); + "upload") + "?uploadTimeoutMs=-1"; + // tempSegmentFileLocation is the location where the segment uploader will upload the segment. This usually ends + // with a random UUID + File tempSegmentFileLocation = new File(TEMP_DIR, segmentsZKMetadata.get(0).getSegmentName() + UUID.randomUUID()); + FileUtils.write(tempSegmentFileLocation, "test"); + // After the deep-store retry task gets the segment location returned by Pinot server, it will move the segment to + // its final location. This is the expected segment location. + String expectedSegmentLocation = segmentManager.createSegmentPath(RAW_TABLE_NAME, + segmentsZKMetadata.get(0).getSegmentName()).toString(); when(segmentManager._mockedFileUploadDownloadClient - .uploadToSegmentStore(serverUploadRequestUrl0)).thenReturn(segmentDownloadUrl0); + .uploadToSegmentStore(serverUploadRequestUrl0)).thenReturn(tempSegmentFileLocation.getPath()); // Change 2nd segment status to be DONE, but with default peer download url. // Verify later the download url isn't fixed after upload failure. @@ -1013,7 +1026,7 @@ public class PinotLLCRealtimeSegmentManagerTest { "segments", REALTIME_TABLE_NAME, segmentsZKMetadata.get(1).getSegmentName(), - "upload"); + "upload") + "?uploadTimeoutMs=-1"; when(segmentManager._mockedFileUploadDownloadClient .uploadToSegmentStore(serverUploadRequestUrl1)) .thenThrow(new HttpErrorStatusException( @@ -1041,11 +1054,15 @@ public class PinotLLCRealtimeSegmentManagerTest { when(pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME)) .thenReturn(segmentManager._tableConfig); + // Verify the result segmentManager.uploadToDeepStoreIfMissing(segmentManager._tableConfig, segmentsZKMetadata); assertEquals( segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(0), null).getDownloadUrl(), - segmentDownloadUrl0); + expectedSegmentLocation); + assertFalse(tempSegmentFileLocation.exists(), + "Deep-store retry task should move the file from temp location to permanent location"); + assertEquals( segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(1), null).getDownloadUrl(), CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java index 9a34872a50..4f173521c9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java @@ -59,7 +59,13 @@ public class PinotFSSegmentUploader implements SegmentUploader { _serverMetrics = serverMetrics; } + @Override public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) { + return uploadSegment(segmentFile, segmentName, _timeoutInMs); + } + + @Override + public URI uploadSegment(File segmentFile, LLCSegmentName segmentName, int timeoutInMillis) { if (_segmentStoreUriStr == null || _segmentStoreUriStr.isEmpty()) { LOGGER.error("Missing segment store uri. Failed to upload segment file {} for {}.", segmentFile.getName(), segmentName.getSegmentName()); @@ -89,7 +95,7 @@ public class PinotFSSegmentUploader implements SegmentUploader { }; Future<URI> future = _executorService.submit(uploadTask); try { - URI segmentLocation = future.get(_timeoutInMs, TimeUnit.MILLISECONDS); + URI segmentLocation = future.get(timeoutInMillis, TimeUnit.MILLISECONDS); LOGGER.info("Successfully upload segment {} to {}.", segmentName, segmentLocation); _serverMetrics.addMeteredTableValue(rawTableName, segmentLocation == null ? ServerMeter.SEGMENT_UPLOAD_FAILURE : ServerMeter.SEGMENT_UPLOAD_SUCCESS, 1); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java index 44d0a36067..63bbe99a5b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java @@ -24,6 +24,15 @@ import org.apache.pinot.common.utils.LLCSegmentName; public interface SegmentUploader { - // Returns the URI of the uploaded segment. null if the upload fails. + + /** + * Uploads the given segmentFile to the deep-store. Returns the URI where the segment is uploaded. + */ URI uploadSegment(File segmentFile, LLCSegmentName segmentName); + + /** + * Uploads the given segmentFile to the deep-store. Returns the URI where the segment is uploaded. The upload will + * wait for the specified timeout. + */ + URI uploadSegment(File segmentFile, LLCSegmentName segmentName, int timeoutInMillis); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java index 5aa5b8b266..4b00563125 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java @@ -64,7 +64,12 @@ public class Server2ControllerSegmentUploader implements SegmentUploader { @Override public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) { - SegmentCompletionProtocol.Response response = uploadSegmentToController(segmentFile); + return uploadSegment(segmentFile, segmentName, _segmentUploadRequestTimeoutMs); + } + + @Override + public URI uploadSegment(File segmentFile, LLCSegmentName segmentName, int timeoutInMillis) { + SegmentCompletionProtocol.Response response = uploadSegmentToController(segmentFile, timeoutInMillis); if (response.getStatus() == SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS) { try { URI uri = new URI(response.getSegmentLocation()); @@ -79,12 +84,16 @@ public class Server2ControllerSegmentUploader implements SegmentUploader { } public SegmentCompletionProtocol.Response uploadSegmentToController(File segmentFile) { + return uploadSegmentToController(segmentFile, _segmentUploadRequestTimeoutMs); + } + + private SegmentCompletionProtocol.Response uploadSegmentToController(File segmentFile, int timeoutInMillis) { SegmentCompletionProtocol.Response response; long startTime = System.currentTimeMillis(); try { String responseStr = _fileUploadDownloadClient .uploadSegment(_controllerSegmentUploadCommitUrl, _segmentName, segmentFile, - AuthProviderUtils.toRequestHeaders(_authProvider), null, _segmentUploadRequestTimeoutMs).getResponse(); + AuthProviderUtils.toRequestHeaders(_authProvider), null, timeoutInMillis).getResponse(); response = SegmentCompletionProtocol.Response.fromJsonString(responseStr); _segmentLogger.info("Controller response {} for {}", response.toJsonString(), _controllerSegmentUploadCommitUrl); if (response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER)) { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index d9f0b6c76e..e518d2d6b1 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -469,7 +469,14 @@ public class TablesResource { * when segment store copy is unavailable for committed low level consumer segments. * Please note that invocation of this endpoint may cause query performance to suffer, since we tar up the segment * to upload it. - * @see <a>href="https://tinyurl.com/f63ru4sb</a> + * + * @see <a href="https://tinyurl.com/f63ru4sb></a> + * @param realtimeTableName table name with type. + * @param segmentName name of the segment to be uploaded + * @param timeoutMs timeout for the segment upload to the deep-store. If this is negative, the default timeout + * would be used. + * @return full url where the segment is uploaded + * @throws Exception if an error occurred during the segment upload. */ @POST @Path("/segments/{realtimeTableName}/{segmentName}/upload") @@ -485,7 +492,8 @@ public class TablesResource { public String uploadLLCSegment( @ApiParam(value = "Name of the REALTIME table", required = true) @PathParam("realtimeTableName") String realtimeTableName, - @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") String segmentName) + @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") String segmentName, + @QueryParam("uploadTimeoutMs") @DefaultValue("-1") int timeoutMs) throws Exception { LOGGER.info("Received a request to upload low level consumer segment {} for table {}", segmentName, realtimeTableName); @@ -527,7 +535,13 @@ public class TablesResource { // Use segment uploader to upload the segment tar file to segment store and return the segment download url. SegmentUploader segmentUploader = _serverInstance.getInstanceDataManager().getSegmentUploader(); - URI segmentDownloadUrl = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(segmentName)); + URI segmentDownloadUrl; + if (timeoutMs <= 0) { + // Use default timeout if passed timeout is not positive + segmentDownloadUrl = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(segmentName)); + } else { + segmentDownloadUrl = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(segmentName), timeoutMs); + } if (segmentDownloadUrl == null) { throw new WebApplicationException( String.format("Failed to upload table %s segment %s to segment store", realtimeTableName, segmentName), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org