This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 159afb7820 Cleanup segment upload logic and allow validation on
real-time table (#8695)
159afb7820 is described below
commit 159afb7820c30f5eadd75a973ccdc1d99f4e2539
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon May 16 11:14:34 2022 -0700
Cleanup segment upload logic and allow validation on real-time table (#8695)
- Cleanup the segment upload logic and add more checks on the parameters
- Refactor the segment validator to a util class to make it work on both
metadata push and real-time table
- Refine the return code documentation of the segment upload API
---
.../org/apache/pinot/common/utils/URIUtils.java | 14 +-
.../PinotSegmentUploadDownloadRestletResource.java | 262 +++++++++++----------
.../api/upload/SegmentValidationUtils.java | 94 ++++++++
.../controller/api/upload/SegmentValidator.java | 122 ----------
.../pinot/controller/api/upload/ZKOperator.java | 116 ++++-----
.../helix/core/PinotHelixResourceManager.java | 12 -
.../realtime/PinotLLCRealtimeSegmentManager.java | 16 +-
.../helix/core/util/ZKMetadataUtils.java | 12 +-
.../controller/api/upload/ZKOperatorTest.java | 40 ++--
.../helix/core/realtime/SegmentCompletionTest.java | 10 +-
10 files changed, 333 insertions(+), 365 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
index b2617a3972..042427b772 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
@@ -34,11 +34,9 @@ public class URIUtils {
}
/**
- * Returns the URI for the given base path and optional parts, appends the
local (file) scheme to the URI if no
- * scheme exists. All the parts will be appended to the base path with the
file separator.
+ * Returns the URI for the given path, appends the local (file) scheme to
the URI if no scheme exists.
*/
- public static URI getUri(String basePath, String... parts) {
- String path = getPath(basePath, parts);
+ public static URI getUri(String path) {
try {
URI uri = new URI(path);
if (uri.getScheme() != null) {
@@ -51,6 +49,14 @@ public class URIUtils {
}
}
+ /**
+ * Returns the URI for the given base path and optional parts, appends the
local (file) scheme to the URI if no
+ * scheme exists. All the parts will be appended to the base path with the
file separator.
+ */
+ public static URI getUri(String basePath, String... parts) {
+ return getUri(getPath(basePath, parts));
+ }
+
/**
* Returns the path for the given base path and optional parts. All the
parts will be appended to the base path with
* the file separator.
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 16f2bf1865..6739bb9cc5 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -18,8 +18,8 @@
*/
package org.apache.pinot.controller.api.resources;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@@ -59,6 +59,7 @@ import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metrics.ControllerMeter;
@@ -74,12 +75,13 @@ import
org.apache.pinot.controller.api.access.AccessControlFactory;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
-import org.apache.pinot.controller.api.upload.SegmentValidator;
+import org.apache.pinot.controller.api.upload.SegmentValidationUtils;
import org.apache.pinot.controller.api.upload.ZKOperator;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.metadata.DefaultMetadataExtractor;
import org.apache.pinot.core.metadata.MetadataExtractorFactory;
import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.crypt.PinotCrypter;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
@@ -186,50 +188,78 @@ public class PinotSegmentUploadDownloadRestletResource {
return builder.build();
}
- private SuccessResponse uploadSegment(@Nullable String tableName, TableType
tableType, FormDataMultiPart multiPart,
- boolean enableParallelPushProtection, HttpHeaders headers, Request
request, boolean moveSegmentToFinalLocation,
- boolean allowRefresh) {
- String uploadTypeStr = null;
- String crypterClassNameInHeader = null;
- String downloadUri = null;
- String ingestionDescriptor = null;
- if (headers != null) {
- extractHttpHeader(headers,
CommonConstants.Controller.SEGMENT_NAME_HTTP_HEADER);
- extractHttpHeader(headers,
CommonConstants.Controller.TABLE_NAME_HTTP_HEADER);
- ingestionDescriptor = extractHttpHeader(headers,
CommonConstants.Controller.INGESTION_DESCRIPTOR);
- uploadTypeStr = extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
- crypterClassNameInHeader = extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.CRYPTER);
- downloadUri = extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
+ private SuccessResponse uploadSegment(@Nullable String tableName, TableType
tableType,
+ @Nullable FormDataMultiPart multiPart, boolean
moveSegmentToFinalLocation, boolean enableParallelPushProtection,
+ boolean allowRefresh, HttpHeaders headers, Request request) {
+ if (StringUtils.isNotEmpty(tableName)) {
+ TableType tableTypeFromTableName =
TableNameBuilder.getTableTypeFromTableName(tableName);
+ if (tableTypeFromTableName != null && tableTypeFromTableName !=
tableType) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Table name: %s does not match table type: %s",
tableName, tableType),
+ Response.Status.BAD_REQUEST);
+ }
}
+
+ // TODO: Consider validating the segment name and table name from the
header against the actual segment
+ extractHttpHeader(headers,
CommonConstants.Controller.SEGMENT_NAME_HTTP_HEADER);
+ extractHttpHeader(headers,
CommonConstants.Controller.TABLE_NAME_HTTP_HEADER);
+
+ String uploadTypeStr = extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
+ String downloadURI = extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
+ String crypterClassNameInHeader = extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.CRYPTER);
+ String ingestionDescriptor = extractHttpHeader(headers,
CommonConstants.Controller.INGESTION_DESCRIPTOR);
+
File tempEncryptedFile = null;
File tempDecryptedFile = null;
File tempSegmentDir = null;
try {
ControllerFilePathProvider provider =
ControllerFilePathProvider.getInstance();
String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID();
- tempDecryptedFile = new File(provider.getFileUploadTempDir(),
tempFileName);
tempEncryptedFile = new File(provider.getFileUploadTempDir(),
tempFileName + ENCRYPTED_SUFFIX);
+ tempDecryptedFile = new File(provider.getFileUploadTempDir(),
tempFileName);
tempSegmentDir = new File(provider.getUntarredFileTempDir(),
tempFileName);
- boolean uploadedSegmentIsEncrypted =
!Strings.isNullOrEmpty(crypterClassNameInHeader);
+ boolean uploadedSegmentIsEncrypted =
StringUtils.isNotEmpty(crypterClassNameInHeader);
FileUploadDownloadClient.FileUploadType uploadType =
getUploadType(uploadTypeStr);
- File dstFile = uploadedSegmentIsEncrypted ? tempEncryptedFile :
tempDecryptedFile;
+ File destFile = uploadedSegmentIsEncrypted ? tempEncryptedFile :
tempDecryptedFile;
long segmentSizeInBytes;
switch (uploadType) {
- case URI:
- downloadSegmentFileFromURI(downloadUri, dstFile, tableName);
- segmentSizeInBytes = dstFile.length();
- break;
case SEGMENT:
- createSegmentFileFromMultipart(multiPart, dstFile);
- segmentSizeInBytes = dstFile.length();
+ if (multiPart == null) {
+ throw new ControllerApplicationException(LOGGER,
+ "Segment file (as multipart/form-data) is required for SEGMENT
upload mode",
+ Response.Status.BAD_REQUEST);
+ }
+ if (!moveSegmentToFinalLocation && StringUtils.isEmpty(downloadURI))
{
+ throw new ControllerApplicationException(LOGGER,
+ "Download URI is required if segment should not be copied to
the deep store",
+ Response.Status.BAD_REQUEST);
+ }
+ createSegmentFileFromMultipart(multiPart, destFile);
+ segmentSizeInBytes = destFile.length();
+ break;
+ case URI:
+ if (StringUtils.isEmpty(downloadURI)) {
+ throw new ControllerApplicationException(LOGGER, "Download URI is
required for URI upload mode",
+ Response.Status.BAD_REQUEST);
+ }
+ downloadSegmentFileFromURI(downloadURI, destFile, tableName);
+ segmentSizeInBytes = destFile.length();
break;
case METADATA:
+ if (multiPart == null) {
+ throw new ControllerApplicationException(LOGGER,
+ "Segment metadata file (as multipart/form-data) is required
for METADATA upload mode",
+ Response.Status.BAD_REQUEST);
+ }
+ if (StringUtils.isEmpty(downloadURI)) {
+ throw new ControllerApplicationException(LOGGER, "Download URI is
required for METADATA upload mode",
+ Response.Status.BAD_REQUEST);
+ }
moveSegmentToFinalLocation = false;
- Preconditions.checkState(downloadUri != null, "Download URI is
required in segment metadata upload mode");
- createSegmentFileFromMultipart(multiPart, dstFile);
+ createSegmentFileFromMultipart(multiPart, destFile);
try {
- URI segmentURI = new URI(downloadUri);
+ URI segmentURI = new URI(downloadURI);
PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme());
segmentSizeInBytes = pinotFS.length(segmentURI);
} catch (Exception e) {
@@ -238,7 +268,8 @@ public class PinotSegmentUploadDownloadRestletResource {
}
break;
default:
- throw new UnsupportedOperationException("Unsupported upload type: "
+ uploadType);
+ throw new ControllerApplicationException(LOGGER, "Unsupported upload
type: " + uploadType,
+ Response.Status.BAD_REQUEST);
}
if (uploadedSegmentIsEncrypted) {
@@ -253,67 +284,70 @@ public class PinotSegmentUploadDownloadRestletResource {
// Fetch table name. Try to derive the table name from the parameter and
then from segment metadata
String rawTableName;
- if (tableName != null && !tableName.isEmpty()) {
+ if (StringUtils.isNotEmpty(tableName)) {
rawTableName = TableNameBuilder.extractRawTableName(tableName);
- LOGGER.info("Uploading a segment {} to table: {}, push type {},
(Derived from API parameter)", segmentName,
- tableName, uploadType);
} else {
// TODO: remove this when we completely deprecate the table name from
segment metadata
rawTableName = segmentMetadata.getTableName();
- LOGGER.info("Uploading a segment {} to table: {}, push type {},
(Derived from segment metadata)", segmentName,
- tableName, uploadType);
+ LOGGER.warn("Table name is not provided when uploading segment: {} for
table: {}", segmentName, rawTableName);
}
-
String tableNameWithType;
if (tableType == TableType.OFFLINE) {
tableNameWithType =
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
} else {
- if (!_pinotHelixResourceManager.isUpsertTable(rawTableName)) {
- throw new UnsupportedOperationException(
- "Upload segment to non-upsert realtime table is not supported "
+ rawTableName);
- }
tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+ if (!_pinotHelixResourceManager.isUpsertTable(tableNameWithType)) {
+ throw new ControllerApplicationException(LOGGER,
+ "Cannot upload segment to non-upsert real-time table: " +
tableNameWithType, Response.Status.FORBIDDEN);
+ }
}
String clientAddress =
InetAddress.getByName(request.getRemoteAddr()).getHostName();
- LOGGER.info("Processing upload request for segment: {} of table: {} from
client: {}, ingestion descriptor: {}",
- segmentName, tableNameWithType, clientAddress, ingestionDescriptor);
-
- // Skip segment validation if upload is to an offline table and only
segment metadata. Skip segment validation for
- // realtime tables because the feature is experimental and only
applicable to upsert enabled table currently.
- if (tableType == TableType.OFFLINE && uploadType !=
FileUploadDownloadClient.FileUploadType.METADATA) {
- // Validate segment
- new SegmentValidator(_pinotHelixResourceManager, _controllerConf,
_executor, _connectionManager,
- _controllerMetrics,
_leadControllerManager.isLeaderForTable(tableNameWithType))
- .validateOfflineSegment(tableNameWithType, segmentMetadata,
tempSegmentDir);
+ LOGGER.info("Processing upload request for segment: {} of table: {} with
upload type: {} from client: {}, "
+ + "ingestion descriptor: {}", segmentName, tableNameWithType,
uploadType, clientAddress, ingestionDescriptor);
+
+ // Validate segment
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ throw new ControllerApplicationException(LOGGER, "Failed to find
table: " + tableNameWithType,
+ Response.Status.BAD_REQUEST);
+ }
+ SegmentValidationUtils.validateTimeInterval(segmentMetadata,
tableConfig);
+ if (uploadType != FileUploadDownloadClient.FileUploadType.METADATA) {
+ SegmentValidationUtils.checkStorageQuota(tempSegmentDir,
segmentMetadata, tableConfig,
+ _pinotHelixResourceManager, _controllerConf, _controllerMetrics,
_connectionManager, _executor,
+ _leadControllerManager.isLeaderForTable(tableNameWithType));
}
// Encrypt segment
- String crypterClassNameInTableConfig =
-
_pinotHelixResourceManager.getCrypterClassNameFromTableConfig(tableNameWithType);
+ String crypterNameInTableConfig =
tableConfig.getValidationConfig().getCrypterClassName();
Pair<String, File> encryptionInfo =
encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile,
uploadedSegmentIsEncrypted,
- crypterClassNameInHeader, crypterClassNameInTableConfig,
segmentName, tableNameWithType);
-
- String crypterClassName = encryptionInfo.getLeft();
- File finalSegmentFile = encryptionInfo.getRight();
-
- // ZK download URI
- String zkDownloadUri;
- // This boolean is here for V1 segment upload, where we keep the segment
in the downloadURI sent in the header.
- // We will deprecate this behavior eventually.
- if (!moveSegmentToFinalLocation) {
- LOGGER
- .info("Setting zkDownloadUri: to {} for segment: {} of table: {},
skipping move", downloadUri, segmentName,
- tableNameWithType);
- zkDownloadUri = downloadUri;
- } else {
- zkDownloadUri = getZkDownloadURIForSegmentUpload(rawTableName,
segmentName);
+ crypterClassNameInHeader, crypterNameInTableConfig, segmentName,
tableNameWithType);
+
+ String crypterName = encryptionInfo.getLeft();
+ File segmentFile = encryptionInfo.getRight();
+
+ // Update download URI if controller is responsible for moving the
segment to the deep store
+ URI finalSegmentLocationURI = null;
+ if (moveSegmentToFinalLocation) {
+ URI dataDirURI = provider.getDataDirURI();
+ String dataDirPath = dataDirURI.toString();
+ String encodedSegmentName = URIUtils.encode(segmentName);
+ String finalSegmentLocationPath = URIUtils.getPath(dataDirPath,
rawTableName, encodedSegmentName);
+ if
(dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME))
{
+ downloadURI = URIUtils.getPath(provider.getVip(), "segments",
rawTableName, encodedSegmentName);
+ } else {
+ downloadURI = finalSegmentLocationPath;
+ }
+ finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath);
}
+ LOGGER.info("Using download URI: {} for segment: {} of table: {} (move
segment: {})", downloadURI, segmentFile,
+ tableNameWithType, moveSegmentToFinalLocation);
- // Zk operations
- completeZkOperations(enableParallelPushProtection, headers,
finalSegmentFile, tableNameWithType, segmentMetadata,
- segmentName, zkDownloadUri, moveSegmentToFinalLocation,
crypterClassName, allowRefresh, segmentSizeInBytes);
+ ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager,
_controllerConf, _controllerMetrics);
+ zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata,
finalSegmentLocationURI, segmentFile,
+ downloadURI, crypterName, segmentSizeInBytes,
enableParallelPushProtection, allowRefresh, headers);
return new SuccessResponse("Successfully uploaded segment: " +
segmentName + " of table: " + tableNameWithType);
} catch (WebApplicationException e) {
@@ -338,16 +372,17 @@ public class PinotSegmentUploadDownloadRestletResource {
return value;
}
+ @VisibleForTesting
Pair<String, File> encryptSegmentIfNeeded(File tempDecryptedFile, File
tempEncryptedFile,
boolean isUploadedSegmentEncrypted, String crypterUsedInUploadedSegment,
String crypterClassNameInTableConfig,
String segmentName, String tableNameWithType) {
- boolean segmentNeedsEncryption =
!Strings.isNullOrEmpty(crypterClassNameInTableConfig);
+ boolean segmentNeedsEncryption =
StringUtils.isNotEmpty(crypterClassNameInTableConfig);
// form the output
File finalSegmentFile =
(isUploadedSegmentEncrypted || segmentNeedsEncryption) ?
tempEncryptedFile : tempDecryptedFile;
- String crypterClassName =
Strings.isNullOrEmpty(crypterClassNameInTableConfig) ?
crypterUsedInUploadedSegment
+ String crypterClassName =
StringUtils.isEmpty(crypterClassNameInTableConfig) ?
crypterUsedInUploadedSegment
: crypterClassNameInTableConfig;
ImmutablePair<String, File> out = ImmutablePair.of(crypterClassName,
finalSegmentFile);
@@ -371,19 +406,6 @@ public class PinotSegmentUploadDownloadRestletResource {
return out;
}
- private String getZkDownloadURIForSegmentUpload(String rawTableName, String
segmentName) {
- ControllerFilePathProvider provider =
ControllerFilePathProvider.getInstance();
- URI dataDirURI = provider.getDataDirURI();
- if
(dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME))
{
- return URIUtils.constructDownloadUrl(provider.getVip(), rawTableName,
segmentName);
- } else {
- // Receiving .tar.gz segment upload for pluggable storage. Download URI
is the same as final segment location.
- String downloadUri = URIUtils.getPath(dataDirURI.toString(),
rawTableName, URIUtils.encode(segmentName));
- LOGGER.info("Using download uri: {} for segment: {} of table {}",
downloadUri, segmentName, rawTableName);
- return downloadUri;
- }
- }
-
private void downloadSegmentFileFromURI(String currentSegmentLocationURI,
File destFile, String tableName)
throws Exception {
if (currentSegmentLocationURI == null ||
currentSegmentLocationURI.isEmpty()) {
@@ -406,19 +428,6 @@ public class PinotSegmentUploadDownloadRestletResource {
return
MetadataExtractorFactory.create(metadataProviderClass).extractMetadata(tempDecryptedFile,
tempSegmentDir);
}
- private void completeZkOperations(boolean enableParallelPushProtection,
HttpHeaders headers, File uploadedSegmentFile,
- String tableNameWithType, SegmentMetadata segmentMetadata, String
segmentName, String zkDownloadURI,
- boolean moveSegmentToFinalLocation, String crypter, boolean
allowRefresh, long segmentSizeInBytes)
- throws Exception {
- String basePath =
ControllerFilePathProvider.getInstance().getDataDirURI().toString();
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- URI finalSegmentLocationURI = URIUtils.getUri(basePath, rawTableName,
URIUtils.encode(segmentName));
- ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager,
_controllerConf, _controllerMetrics);
- zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata,
finalSegmentLocationURI,
- uploadedSegmentFile, enableParallelPushProtection, headers,
zkDownloadURI, moveSegmentToFinalLocation, crypter,
- allowRefresh, segmentSizeInBytes);
- }
-
private void decryptFile(String crypterClassName, File tempEncryptedFile,
File tempDecryptedFile) {
PinotCrypter pinotCrypter = PinotCrypterFactory.create(crypterClassName);
LOGGER.info("Using crypter class {} for decrypting {} to {}",
pinotCrypter.getClass().getName(), tempEncryptedFile,
@@ -435,7 +444,11 @@ public class PinotSegmentUploadDownloadRestletResource {
@ApiOperation(value = "Upload a segment", notes = "Upload a segment as json")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Successfully uploaded segment"),
- @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+ @ApiResponse(code = 400, message = "Bad Request"),
+ @ApiResponse(code = 403, message = "Segment validation fails"),
+ @ApiResponse(code = 409, message = "Segment already exists or another
parallel push in progress"),
+ @ApiResponse(code = 410, message = "Segment to refresh does not exist"),
+ @ApiResponse(code = 412, message = "CRC check fails"),
@ApiResponse(code = 500, message = "Internal error")
})
// We use this endpoint with URI upload because a request sent with the
multipart content type will reject the POST
@@ -453,9 +466,8 @@ public class PinotSegmentUploadDownloadRestletResource {
@QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH)
boolean allowRefresh,
@Context HttpHeaders headers, @Context Request request, @Suspended final
AsyncResponse asyncResponse) {
try {
- asyncResponse.resume(
- uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()),
null, enableParallelPushProtection,
- headers, request, false, allowRefresh));
+ asyncResponse.resume(uploadSegment(tableName,
TableType.valueOf(tableType.toUpperCase()), null, false,
+ enableParallelPushProtection, allowRefresh, headers, request));
} catch (Throwable t) {
asyncResponse.resume(t);
}
@@ -470,7 +482,11 @@ public class PinotSegmentUploadDownloadRestletResource {
@ApiOperation(value = "Upload a segment", notes = "Upload a segment as
binary")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Successfully uploaded segment"),
- @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+ @ApiResponse(code = 400, message = "Bad Request"),
+ @ApiResponse(code = 403, message = "Segment validation fails"),
+ @ApiResponse(code = 409, message = "Segment already exists or another
parallel push in progress"),
+ @ApiResponse(code = 410, message = "Segment to refresh does not exist"),
+ @ApiResponse(code = 412, message = "CRC check fails"),
@ApiResponse(code = 500, message = "Internal error")
})
// For the multipart endpoint, we will always move segment to final location
regardless of the segment endpoint.
@@ -486,9 +502,8 @@ public class PinotSegmentUploadDownloadRestletResource {
@QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH)
boolean allowRefresh,
@Context HttpHeaders headers, @Context Request request, @Suspended final
AsyncResponse asyncResponse) {
try {
- asyncResponse.resume(
- uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()),
multiPart, enableParallelPushProtection,
- headers, request, true, allowRefresh));
+ asyncResponse.resume(uploadSegment(tableName,
TableType.valueOf(tableType.toUpperCase()), multiPart, true,
+ enableParallelPushProtection, allowRefresh, headers, request));
} catch (Throwable t) {
asyncResponse.resume(t);
}
@@ -503,7 +518,11 @@ public class PinotSegmentUploadDownloadRestletResource {
@ApiOperation(value = "Upload a segment", notes = "Upload a segment as json")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Successfully uploaded segment"),
- @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+ @ApiResponse(code = 400, message = "Bad Request"),
+ @ApiResponse(code = 403, message = "Segment validation fails"),
+ @ApiResponse(code = 409, message = "Segment already exists or another
parallel push in progress"),
+ @ApiResponse(code = 410, message = "Segment to refresh does not exist"),
+ @ApiResponse(code = 412, message = "CRC check fails"),
@ApiResponse(code = 500, message = "Internal error")
})
// We use this endpoint with URI upload because a request sent with the
multipart content type will reject the POST
@@ -522,8 +541,8 @@ public class PinotSegmentUploadDownloadRestletResource {
@Context HttpHeaders headers, @Context Request request, @Suspended final
AsyncResponse asyncResponse) {
try {
asyncResponse.resume(
- uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()),
null, enableParallelPushProtection,
- headers, request, true, allowRefresh));
+ uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()),
null, true, enableParallelPushProtection,
+ allowRefresh, headers, request));
} catch (Throwable t) {
asyncResponse.resume(t);
}
@@ -538,7 +557,11 @@ public class PinotSegmentUploadDownloadRestletResource {
@ApiOperation(value = "Upload a segment", notes = "Upload a segment as
binary")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Successfully uploaded segment"),
- @ApiResponse(code = 410, message = "Segment to refresh is deleted"),
+ @ApiResponse(code = 400, message = "Bad Request"),
+ @ApiResponse(code = 403, message = "Segment validation fails"),
+ @ApiResponse(code = 409, message = "Segment already exists or another
parallel push in progress"),
+ @ApiResponse(code = 410, message = "Segment to refresh does not exist"),
+ @ApiResponse(code = 412, message = "CRC check fails"),
@ApiResponse(code = 500, message = "Internal error")
})
// This behavior does not differ from v1 of the same endpoint.
@@ -554,9 +577,8 @@ public class PinotSegmentUploadDownloadRestletResource {
@QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH)
boolean allowRefresh,
@Context HttpHeaders headers, @Context Request request, @Suspended final
AsyncResponse asyncResponse) {
try {
- asyncResponse.resume(
- uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()),
multiPart, enableParallelPushProtection,
- headers, request, true, allowRefresh));
+ asyncResponse.resume(uploadSegment(tableName,
TableType.valueOf(tableType.toUpperCase()), multiPart, true,
+ enableParallelPushProtection, allowRefresh, headers, request));
} catch (Throwable t) {
asyncResponse.resume(t);
}
@@ -581,9 +603,8 @@ public class PinotSegmentUploadDownloadRestletResource {
}
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableType, LOGGER).get(0);
- String segmentLineageEntryId = _pinotHelixResourceManager
- .startReplaceSegments(tableNameWithType,
startReplaceSegmentsRequest.getSegmentsFrom(),
- startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup);
+ String segmentLineageEntryId =
_pinotHelixResourceManager.startReplaceSegments(tableNameWithType,
+ startReplaceSegmentsRequest.getSegmentsFrom(),
startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup);
return
Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId",
segmentLineageEntryId)).build();
} catch (WebApplicationException wae) {
throw wae;
@@ -629,8 +650,8 @@ public class PinotSegmentUploadDownloadRestletResource {
public Response revertReplaceSegments(
@ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true)
@QueryParam("type") String tableTypeStr,
- @ApiParam(value = "Segment lineage entry id to revert", required = true)
- @QueryParam("segmentLineageEntryId") String segmentLineageEntryId,
+ @ApiParam(value = "Segment lineage entry id to revert", required = true)
@QueryParam("segmentLineageEntryId")
+ String segmentLineageEntryId,
@ApiParam(value = "Force revert in case the user knows that the lineage
entry is interrupted")
@QueryParam("forceRevert") @DefaultValue("false") boolean forceRevert) {
try {
@@ -652,7 +673,7 @@ public class PinotSegmentUploadDownloadRestletResource {
}
}
- private File createSegmentFileFromMultipart(FormDataMultiPart multiPart,
File dstFile)
+ private static void createSegmentFileFromMultipart(FormDataMultiPart
multiPart, File destFile)
throws IOException {
// Read segment file or segment metadata file and directly use that
information to update zk
Map<String, List<FormDataBodyPart>> segmentMetadataMap =
multiPart.getFields();
@@ -662,12 +683,11 @@ public class PinotSegmentUploadDownloadRestletResource {
}
FormDataBodyPart segmentMetadataBodyPart =
segmentMetadataMap.values().iterator().next().get(0);
try (InputStream inputStream =
segmentMetadataBodyPart.getValueAs(InputStream.class);
- OutputStream outputStream = new FileOutputStream(dstFile)) {
+ OutputStream outputStream = new FileOutputStream(destFile)) {
IOUtils.copyLarge(inputStream, outputStream);
} finally {
multiPart.cleanup();
}
- return dstFile;
}
private FileUploadDownloadClient.FileUploadType getUploadType(String
uploadTypeStr) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java
new file mode 100644
index 0000000000..22f264d256
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.upload;
+
+import java.io.File;
+import java.util.concurrent.Executor;
+import javax.ws.rs.core.Response;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.controller.validation.StorageQuotaChecker;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SegmentValidationUtils provides utility methods to validate the segment
during segment upload.
+ */
+public class SegmentValidationUtils {
+ private SegmentValidationUtils() {
+ }
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentValidationUtils.class);
+
+ public static void validateTimeInterval(SegmentMetadata segmentMetadata,
TableConfig tableConfig) {
+ Interval timeInterval = segmentMetadata.getTimeInterval();
+ if (timeInterval != null) {
+ if (!TimeUtils.isValidTimeInterval(timeInterval)) {
+ throw new ControllerApplicationException(LOGGER, String.format(
+ "Invalid segment start/end time: %s (in millis: %d/%d) for
segment: %s of table: %s, must be between: %s",
+ timeInterval, timeInterval.getStartMillis(),
timeInterval.getEndMillis(), segmentMetadata.getName(),
+ tableConfig.getTableName(), TimeUtils.VALID_TIME_INTERVAL),
Response.Status.FORBIDDEN);
+ }
+ } else {
+ String timeColumn =
tableConfig.getValidationConfig().getTimeColumnName();
+ if (timeColumn != null) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to find time interval in segment: %s for
table: %s with time column: %s",
+ segmentMetadata.getName(), tableConfig.getTableName(),
timeColumn), Response.Status.FORBIDDEN);
+ }
+ }
+ }
+
+ public static void checkStorageQuota(File segmentDir, SegmentMetadata
segmentMetadata, TableConfig tableConfig,
+ PinotHelixResourceManager resourceManager, ControllerConf
controllerConf, ControllerMetrics controllerMetrics,
+ HttpConnectionManager connectionManager, Executor executor, boolean
isLeaderForTable) {
+ if (!controllerConf.getEnableStorageQuotaCheck()) {
+ return;
+ }
+ TableSizeReader tableSizeReader =
+ new TableSizeReader(executor, connectionManager, controllerMetrics,
resourceManager);
+ StorageQuotaChecker quotaChecker =
+ new StorageQuotaChecker(tableConfig, tableSizeReader,
controllerMetrics, isLeaderForTable, resourceManager);
+ StorageQuotaChecker.QuotaCheckerResponse response;
+ try {
+ response =
+ quotaChecker.isSegmentStorageWithinQuota(segmentMetadata.getName(),
FileUtils.sizeOfDirectory(segmentDir),
+ controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Caught exception while checking the storage quota for
segment: %s of table: %s",
+ segmentMetadata.getName(), tableConfig.getTableName()),
Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ if (!response._isSegmentWithinQuota) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Storage quota check failed for segment: %s of table:
%s, reason: %s",
+ segmentMetadata.getName(), tableConfig.getTableName(),
response._reason), Response.Status.FORBIDDEN);
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
deleted file mode 100644
index 3e34afdc67..0000000000
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.api.upload;
-
-import java.io.File;
-import java.util.concurrent.Executor;
-import javax.ws.rs.core.Response;
-import org.apache.commons.httpclient.HttpConnectionManager;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.exception.InvalidConfigException;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.ControllerConf;
-import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.util.TableSizeReader;
-import org.apache.pinot.controller.validation.StorageQuotaChecker;
-import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.utils.TimeUtils;
-import org.joda.time.Interval;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * SegmentValidator is a util class used during segment upload. It does
verification such as a quota check and
- * validating
- * that the segment time values stored in the segment are valid.
- */
-public class SegmentValidator {
- private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentValidator.class);
- private final PinotHelixResourceManager _pinotHelixResourceManager;
- private final ControllerConf _controllerConf;
- private final Executor _executor;
- private final HttpConnectionManager _connectionManager;
- private final ControllerMetrics _controllerMetrics;
- private final boolean _isLeaderForTable;
-
- public SegmentValidator(PinotHelixResourceManager pinotHelixResourceManager,
ControllerConf controllerConf,
- Executor executor, HttpConnectionManager connectionManager,
ControllerMetrics controllerMetrics,
- boolean isLeaderForTable) {
- _pinotHelixResourceManager = pinotHelixResourceManager;
- _controllerConf = controllerConf;
- _executor = executor;
- _connectionManager = connectionManager;
- _controllerMetrics = controllerMetrics;
- _isLeaderForTable = isLeaderForTable;
- }
-
- public void validateOfflineSegment(String offlineTableName, SegmentMetadata
segmentMetadata, File tempSegmentDir) {
- TableConfig offlineTableConfig =
-
ZKMetadataProvider.getOfflineTableConfig(_pinotHelixResourceManager.getPropertyStore(),
offlineTableName);
- if (offlineTableConfig == null) {
- throw new ControllerApplicationException(LOGGER, "Failed to find table
config for table: " + offlineTableName,
- Response.Status.NOT_FOUND);
- }
-
- String segmentName = segmentMetadata.getName();
- StorageQuotaChecker.QuotaCheckerResponse quotaResponse;
- try {
- quotaResponse = checkStorageQuota(tempSegmentDir, segmentMetadata,
offlineTableConfig);
- } catch (InvalidConfigException e) {
- // Admin port is missing, return response with 500 status code.
- throw new ControllerApplicationException(LOGGER,
- "Quota check failed for segment: " + segmentName + " of table: " +
offlineTableName + ", reason: " + e
- .getMessage(), Response.Status.INTERNAL_SERVER_ERROR);
- }
- if (!quotaResponse._isSegmentWithinQuota) {
- throw new ControllerApplicationException(LOGGER,
- "Quota check failed for segment: " + segmentName + " of table: " +
offlineTableName + ", reason: "
- + quotaResponse._reason, Response.Status.FORBIDDEN);
- }
-
- // Check time interval
- // TODO: Pass in schema and check the existence of time interval when time
field exists
- Interval timeInterval = segmentMetadata.getTimeInterval();
- if (timeInterval != null && !TimeUtils.isValidTimeInterval(timeInterval)) {
- throw new ControllerApplicationException(LOGGER, String.format(
- "Invalid segment start/end time: %s (in millis: %d/%d) for segment:
%s of table: %s, must be between: %s",
- timeInterval, timeInterval.getStartMillis(),
timeInterval.getEndMillis(), segmentName, offlineTableName,
- TimeUtils.VALID_TIME_INTERVAL), Response.Status.NOT_ACCEPTABLE);
- }
- }
-
- /**
- * check if the segment represented by segmentFile is within the storage
quota
- * @param segmentFile untarred segment. This should not be null.
- * segmentFile must exist on disk and must be a directory
- * @param metadata segment metadata. This should not be null.
- * @param offlineTableConfig offline table configuration. This should not be
null.
- */
- private StorageQuotaChecker.QuotaCheckerResponse checkStorageQuota(File
segmentFile, SegmentMetadata metadata,
- TableConfig offlineTableConfig)
- throws InvalidConfigException {
- if (!_controllerConf.getEnableStorageQuotaCheck()) {
- return StorageQuotaChecker.success("Quota check is disabled");
- }
- TableSizeReader tableSizeReader =
- new TableSizeReader(_executor, _connectionManager, _controllerMetrics,
_pinotHelixResourceManager);
- StorageQuotaChecker quotaChecker = new
StorageQuotaChecker(offlineTableConfig, tableSizeReader,
- _controllerMetrics, _isLeaderForTable, _pinotHelixResourceManager);
- return quotaChecker.isSegmentStorageWithinQuota(metadata.getName(),
FileUtils.sizeOfDirectory(segmentFile),
- _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
- }
-}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 2e7ba5c76e..7a12d9e5eb 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.api.upload;
import java.io.File;
import java.net.URI;
+import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.helix.ZNRecord;
@@ -33,7 +34,6 @@ import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,26 +60,24 @@ public class ZKOperator {
}
public void completeSegmentOperations(String tableNameWithType,
SegmentMetadata segmentMetadata,
- URI finalSegmentLocationURI, File currentSegmentLocation, boolean
enableParallelPushProtection,
- HttpHeaders headers, String zkDownloadURI, boolean
moveSegmentToFinalLocation, String crypter,
- boolean allowRefresh, long segmentSizeInBytes)
+ @Nullable URI finalSegmentLocationURI, File segmentFile, String
downloadUrl, @Nullable String crypterName,
+ long segmentSizeInBytes, boolean enableParallelPushProtection, boolean
allowRefresh, HttpHeaders headers)
throws Exception {
String segmentName = segmentMetadata.getName();
- ZNRecord segmentMetadataZNRecord =
+ ZNRecord existingSegmentMetadataZNRecord =
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType,
segmentName);
boolean refreshOnly =
Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY));
- if (segmentMetadataZNRecord == null) {
+ if (existingSegmentMetadataZNRecord == null) {
// Add a new segment
if (refreshOnly) {
throw new ControllerApplicationException(LOGGER,
- "Cannot refresh non-existing segment, aborted uploading segment: "
+ segmentName + " of table: "
- + tableNameWithType, Response.Status.GONE);
+ String.format("Cannot refresh non-existing segment: %s for table:
%s", segmentName, tableNameWithType),
+ Response.Status.GONE);
}
- LOGGER.info("Adding new segment {} from table {}", segmentName,
tableNameWithType);
- processNewSegment(segmentMetadata, finalSegmentLocationURI,
currentSegmentLocation, zkDownloadURI, headers,
- crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation,
enableParallelPushProtection,
- segmentSizeInBytes);
+ LOGGER.info("Adding new segment: {} to table: {}", segmentName,
tableNameWithType);
+ processNewSegment(tableNameWithType, segmentMetadata,
finalSegmentLocationURI, segmentFile, downloadUrl,
+ crypterName, segmentSizeInBytes, enableParallelPushProtection,
headers);
} else {
// Refresh an existing segment
if (!allowRefresh) {
@@ -87,26 +85,27 @@ public class ZKOperator {
// done up-front but ends up getting created before the check here, we
could incorrectly refresh an existing
// segment.
throw new ControllerApplicationException(LOGGER,
- "Segment: " + segmentName + " already exists in table: " +
tableNameWithType + ". Refresh not permitted.",
- Response.Status.CONFLICT);
+ String.format("Segment: %s already exists in table: %s. Refresh
not permitted.", segmentName,
+ tableNameWithType), Response.Status.CONFLICT);
}
- LOGGER.info("Segment {} from table {} already exists, refreshing if
necessary", segmentName, tableNameWithType);
- processExistingSegment(segmentMetadata, finalSegmentLocationURI,
currentSegmentLocation,
- enableParallelPushProtection, headers, zkDownloadURI, crypter,
tableNameWithType, segmentName,
- segmentMetadataZNRecord, moveSegmentToFinalLocation,
segmentSizeInBytes);
+ LOGGER.info("Segment: {} already exists in table: {}, refreshing it",
segmentName, tableNameWithType);
+ processExistingSegment(tableNameWithType, segmentMetadata,
existingSegmentMetadataZNRecord,
+ finalSegmentLocationURI, segmentFile, downloadUrl, crypterName,
segmentSizeInBytes,
+ enableParallelPushProtection, headers);
}
}
- private void processExistingSegment(SegmentMetadata segmentMetadata, URI
finalSegmentLocationURI,
- File currentSegmentLocation, boolean enableParallelPushProtection,
HttpHeaders headers, String downloadUrl,
- String crypter, String tableNameWithType, String segmentName, ZNRecord
znRecord,
- boolean moveSegmentToFinalLocation, long segmentSizeInBytes)
+ private void processExistingSegment(String tableNameWithType,
SegmentMetadata segmentMetadata,
+ ZNRecord existingSegmentMetadataZNRecord, @Nullable URI
finalSegmentLocationURI, File segmentFile,
+ String downloadUrl, @Nullable String crypterName, long
segmentSizeInBytes, boolean enableParallelPushProtection,
+ HttpHeaders headers)
throws Exception {
- SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(znRecord);
- long existingCrc = segmentZKMetadata.getCrc();
- int expectedVersion = znRecord.getVersion();
+ String segmentName = segmentMetadata.getName();
+ int expectedVersion = existingSegmentMetadataZNRecord.getVersion();
// Check if CRC match when IF-MATCH header is set
+ SegmentZKMetadata segmentZKMetadata = new
SegmentZKMetadata(existingSegmentMetadataZNRecord);
+ long existingCrc = segmentZKMetadata.getCrc();
checkCRC(headers, tableNameWithType, segmentName, existingCrc);
// Check segment upload start time when parallel push protection enabled
@@ -122,8 +121,8 @@ public class ZKOperator {
} else {
// Another segment upload is in progress
throw new ControllerApplicationException(LOGGER,
- "Another segment upload is in progress for segment: " +
segmentName + " of table: " + tableNameWithType
- + ", retry later", Response.Status.CONFLICT);
+ String.format("Another segment upload is in progress for
segment: %s of table: %s, retry later",
+ segmentName, tableNameWithType), Response.Status.CONFLICT);
}
}
@@ -131,7 +130,7 @@ public class ZKOperator {
segmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis());
if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
segmentZKMetadata, expectedVersion)) {
throw new ControllerApplicationException(LOGGER,
- "Failed to lock the segment: " + segmentName + " of table: " +
tableNameWithType + ", retry later",
+ String.format("Failed to lock the segment: %s of table: %s, retry
later", segmentName, tableNameWithType),
Response.Status.CONFLICT);
} else {
// The version will increment if the zk metadata update is successful
@@ -179,13 +178,10 @@ public class ZKOperator {
LOGGER.info(
"New segment crc {} is different than the existing segment crc {}.
Updating ZK metadata and refreshing "
+ "segment {}", newCrc, existingCrc, segmentName);
- if (moveSegmentToFinalLocation) {
- moveSegmentToPermanentDirectory(currentSegmentLocation,
finalSegmentLocationURI);
- LOGGER.info("Moved segment {} from temp location {} to {}",
segmentName,
- currentSegmentLocation.getAbsolutePath(),
finalSegmentLocationURI.getPath());
- } else {
- LOGGER.info("Skipping segment move, keeping segment {} from table {}
at {}", segmentName, tableNameWithType,
- downloadUrl);
+ if (finalSegmentLocationURI != null) {
+ moveSegmentToPermanentDirectory(segmentFile,
finalSegmentLocationURI);
+ LOGGER.info("Moved segment: {} of table: {} to final location: {}",
segmentName, tableNameWithType,
+ finalSegmentLocationURI);
}
// NOTE: Must first set the segment ZK metadata before trying to
refresh because servers and brokers rely on
@@ -196,11 +192,11 @@ public class ZKOperator {
// If no modifier is provided, use the custom map from the segment
metadata
segmentZKMetadata.setCustomMap(null);
ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType,
segmentZKMetadata, segmentMetadata, downloadUrl,
- crypter, segmentSizeInBytes);
+ crypterName, segmentSizeInBytes);
} else {
// If modifier is provided, first set the custom map from the
segment metadata, then apply the modifier
ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType,
segmentZKMetadata, segmentMetadata, downloadUrl,
- crypter, segmentSizeInBytes);
+ crypterName, segmentSizeInBytes);
segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
}
if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
segmentZKMetadata, expectedVersion)) {
@@ -222,7 +218,7 @@ public class ZKOperator {
}
}
- private void checkCRC(HttpHeaders headers, String offlineTableName, String
segmentName, long existingCrc) {
+ private void checkCRC(HttpHeaders headers, String tableNameWithType, String
segmentName, long existingCrc) {
String expectedCrcStr = headers.getHeaderString(HttpHeaders.IF_MATCH);
if (expectedCrcStr != null) {
long expectedCrc;
@@ -230,24 +226,24 @@ public class ZKOperator {
expectedCrc = Long.parseLong(expectedCrcStr);
} catch (NumberFormatException e) {
throw new ControllerApplicationException(LOGGER,
- "Caught exception for segment: " + segmentName + " of table: " +
offlineTableName
- + " while parsing IF-MATCH CRC: \"" + expectedCrcStr + "\"",
Response.Status.PRECONDITION_FAILED);
+ String.format("Caught exception for segment: %s of table: %s while
parsing IF-MATCH CRC: \"%s\"",
+ segmentName, tableNameWithType, expectedCrcStr),
Response.Status.PRECONDITION_FAILED);
}
if (expectedCrc != existingCrc) {
throw new ControllerApplicationException(LOGGER,
- "For segment: " + segmentName + " of table: " + offlineTableName +
", expected CRC: " + expectedCrc
- + " does not match existing CRC: " + existingCrc,
Response.Status.PRECONDITION_FAILED);
+ String.format("For segment: %s of table: %s, expected CRC: %d does
not match existing CRC: %d", segmentName,
+ tableNameWithType, expectedCrc, existingCrc),
Response.Status.PRECONDITION_FAILED);
}
}
}
- private void processNewSegment(SegmentMetadata segmentMetadata, URI
finalSegmentLocationURI,
- File currentSegmentLocation, String zkDownloadURI, HttpHeaders headers,
String crypter, String tableNameWithType,
- String segmentName, boolean moveSegmentToFinalLocation, boolean
enableParallelPushProtection,
- long segmentSizeInBytes)
+ private void processNewSegment(String tableNameWithType, SegmentMetadata
segmentMetadata,
+ @Nullable URI finalSegmentLocationURI, File segmentFile, String
downloadUrl, @Nullable String crypterName,
+ long segmentSizeInBytes, boolean enableParallelPushProtection,
HttpHeaders headers)
throws Exception {
+ String segmentName = segmentMetadata.getName();
SegmentZKMetadata newSegmentZKMetadata =
- ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType,
segmentMetadata, zkDownloadURI, crypter,
+ ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType,
segmentMetadata, downloadUrl, crypterName,
segmentSizeInBytes);
// Lock if enableParallelPushProtection is true.
@@ -266,15 +262,14 @@ public class ZKOperator {
}
if (!_pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType,
newSegmentZKMetadata)) {
throw new RuntimeException(
- "Failed to create ZK metadata for segment: " + segmentName + " of
table: " + tableNameWithType);
+ String.format("Failed to create ZK metadata for segment: %s of
table: %s", segmentName, tableNameWithType));
}
- // For v1 segment uploads, we will not move the segment
- if (moveSegmentToFinalLocation) {
+ if (finalSegmentLocationURI != null) {
try {
- moveSegmentToPermanentDirectory(currentSegmentLocation,
finalSegmentLocationURI);
- LOGGER.info("Moved segment {} from temp location {} to {}",
segmentName,
- currentSegmentLocation.getAbsolutePath(),
finalSegmentLocationURI.getPath());
+ moveSegmentToPermanentDirectory(segmentFile, finalSegmentLocationURI);
+ LOGGER.info("Moved segment: {} of table: {} to final location: {}",
segmentName, tableNameWithType,
+ finalSegmentLocationURI);
} catch (Exception e) {
// Cleanup the Zk entry and the segment from the permanent directory
if it exists.
LOGGER.error("Could not move segment {} from table {} to permanent
directory", segmentName, tableNameWithType,
@@ -283,9 +278,6 @@ public class ZKOperator {
LOGGER.info("Deleted zk entry and segment {} for table {}.",
segmentName, tableNameWithType);
throw e;
}
- } else {
- LOGGER.info("Skipping segment move, keeping segment {} from table {} at
{}", segmentName, tableNameWithType,
- zkDownloadURI);
}
try {
@@ -306,18 +298,14 @@ public class ZKOperator {
_pinotHelixResourceManager.deleteSegment(tableNameWithType,
segmentName);
LOGGER.info("Deleted zk entry and segment {} for table {}.",
segmentName, tableNameWithType);
throw new RuntimeException(
- "Failed to update ZK metadata for segment: " + segmentName + " of
table: " + tableNameWithType);
+ String.format("Failed to update ZK metadata for segment: %s of
table: %s", segmentFile, tableNameWithType));
}
}
}
- private void moveSegmentToPermanentDirectory(File currentSegmentLocation,
URI finalSegmentLocationURI)
+ private void moveSegmentToPermanentDirectory(File segmentFile, URI
finalSegmentLocationURI)
throws Exception {
- PinotFS pinotFS =
PinotFSFactory.create(finalSegmentLocationURI.getScheme());
-
- // Overwrites current segment file
- LOGGER.info("Copying segment from {} to {}",
currentSegmentLocation.getAbsolutePath(),
- finalSegmentLocationURI.toString());
- pinotFS.copyFromLocalFile(currentSegmentLocation, finalSegmentLocationURI);
+ LOGGER.info("Copying segment from: {} to: {}",
segmentFile.getAbsolutePath(), finalSegmentLocationURI);
+
PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copyFromLocalFile(segmentFile,
finalSegmentLocationURI);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 4c9f61fc4a..f9d0e86447 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -689,18 +689,6 @@ public class PinotHelixResourceManager {
}
}
- /**
- * Returns the crypter class name defined in the table config for the given
table.
- *
- * @param tableNameWithType Table name with type suffix
- * @return crypter class name
- */
- public String getCrypterClassNameFromTableConfig(String tableNameWithType) {
- TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
- Preconditions.checkNotNull(tableConfig, "Table config is not available for
table '%s'", tableNameWithType);
- return tableConfig.getValidationConfig().getCrypterClassName();
- }
-
/**
* Table related APIs
*/
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index cae0b48949..2d07e20be0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -459,10 +459,6 @@ public class PinotLLCRealtimeSegmentManager {
public void commitSegmentFile(String realtimeTableName,
CommittingSegmentDescriptor committingSegmentDescriptor)
throws Exception {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
- if (isPeerSegmentDownloadScheme(committingSegmentDescriptor)) {
- LOGGER.info("No moving needed for segment on peer servers: {}",
committingSegmentDescriptor.getSegmentLocation());
- return;
- }
String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
String segmentName = committingSegmentDescriptor.getSegmentName();
@@ -470,6 +466,12 @@ public class PinotLLCRealtimeSegmentManager {
// Copy the segment file to the controller
String segmentLocation = committingSegmentDescriptor.getSegmentLocation();
+ Preconditions.checkArgument(segmentLocation != null, "Segment location
must be provided");
+ if (segmentLocation.regionMatches(true, 0,
CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME, 0,
+ CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME.length())) {
+ LOGGER.info("No moving needed for segment on peer servers: {}",
segmentLocation);
+ return;
+ }
URI segmentFileURI = URIUtils.getUri(segmentLocation);
URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(),
rawTableName);
URI uriToMoveTo = URIUtils.getUri(_controllerConf.getDataDir(),
rawTableName, URIUtils.encode(segmentName));
@@ -494,12 +496,6 @@ public class PinotLLCRealtimeSegmentManager {
committingSegmentDescriptor.setSegmentLocation(uriToMoveTo.toString());
}
- private boolean isPeerSegmentDownloadScheme(CommittingSegmentDescriptor
committingSegmentDescriptor) {
- return !(committingSegmentDescriptor == null) &&
!(committingSegmentDescriptor.getSegmentLocation() == null)
- && committingSegmentDescriptor.getSegmentLocation().toLowerCase()
- .startsWith(CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME);
- }
-
/**
* This method is invoked after the realtime segment is uploaded but before
a response is sent to the server.
* It updates the propertystore segment metadata from IN_PROGRESS to DONE,
and also creates new propertystore
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
index 2628d9f6bd..11e67b1663 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
@@ -38,9 +38,9 @@ public class ZKMetadataUtils {
* Creates the segment ZK metadata for a new segment.
*/
public static SegmentZKMetadata createSegmentZKMetadata(String
tableNameWithType, SegmentMetadata segmentMetadata,
- String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) {
+ String downloadUrl, @Nullable String crypterName, long
segmentSizeInBytes) {
SegmentZKMetadata segmentZKMetadata = new
SegmentZKMetadata(segmentMetadata.getName());
- updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata,
segmentMetadata, downloadUrl, crypter,
+ updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata,
segmentMetadata, downloadUrl, crypterName,
segmentSizeInBytes);
segmentZKMetadata.setPushTime(System.currentTimeMillis());
return segmentZKMetadata;
@@ -50,14 +50,14 @@ public class ZKMetadataUtils {
* Refreshes the segment ZK metadata for a segment being replaced.
*/
public static void refreshSegmentZKMetadata(String tableNameWithType,
SegmentZKMetadata segmentZKMetadata,
- SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String
crypter, long segmentSizeInBytes) {
- updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata,
segmentMetadata, downloadUrl, crypter,
+ SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String
crypterName, long segmentSizeInBytes) {
+ updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata,
segmentMetadata, downloadUrl, crypterName,
segmentSizeInBytes);
segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
}
private static void updateSegmentZKMetadata(String tableNameWithType,
SegmentZKMetadata segmentZKMetadata,
- SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String
crypter, long segmentSizeInBytes) {
+ SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String
crypterName, long segmentSizeInBytes) {
if (segmentMetadata.getTimeInterval() != null) {
segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
@@ -77,7 +77,7 @@ public class ZKMetadataUtils {
segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
segmentZKMetadata.setDownloadUrl(downloadUrl);
- segmentZKMetadata.setCrypterName(crypter);
+ segmentZKMetadata.setCrypterName(crypterName);
// Set partition metadata
Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index c9a119b6a5..d4951587ce 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -43,6 +43,7 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+
public class ZKOperatorTest {
private static final String TABLE_NAME = "operatorTestTable";
private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
@@ -70,15 +71,13 @@ public class ZKOperatorTest {
// Test if Zk segment metadata is removed if exception is thrown when
moving segment to final location.
try {
- // Create mock finalSegmentLocationURI and currentSegmentLocation.
+ // Create mock finalSegmentLocationURI and segmentFile.
URI finalSegmentLocationURI =
URIUtils.getUri("mockPath", OFFLINE_TABLE_NAME,
URIUtils.encode(segmentMetadata.getName()));
- File currentSegmentLocation = new File(new File("foo/bar"), "mockChild");
+ File segmentFile = new File(new File("foo/bar"), "mockChild");
- zkOperator
- .completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata,
finalSegmentLocationURI,
- currentSegmentLocation, true, httpHeaders, "downloadUrl",
- true, "crypter", true, 10);
+ zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME,
segmentMetadata, finalSegmentLocationURI, segmentFile,
+ "downloadUrl", "crypter", 10, true, true, httpHeaders);
fail();
} catch (Exception e) {
// Expected
@@ -91,9 +90,8 @@ public class ZKOperatorTest {
return segmentZKMetadata == null;
}, 30_000L, "Failed to delete segmentZkMetadata.");
- zkOperator
- .completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null,
null, true, httpHeaders, "downloadUrl",
- false, "crypter", true, 10);
+ zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata,
null, null, "downloadUrl", "crypter", 10,
+ true, true, httpHeaders);
SegmentZKMetadata segmentZKMetadata =
ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME,
SEGMENT_NAME);
@@ -110,8 +108,8 @@ public class ZKOperatorTest {
// Upload the same segment with allowRefresh = false. Validate that an
exception is thrown.
try {
- zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME,
segmentMetadata, null, null, false, httpHeaders,
- "otherDownloadUrl", false, "otherCrypter", false, 10);
+ zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME,
segmentMetadata, null, null, "otherDownloadUrl",
+ "otherCrypter", 10, true, false, httpHeaders);
fail();
} catch (Exception e) {
// Expected
@@ -120,8 +118,8 @@ public class ZKOperatorTest {
// Refresh the segment with unmatched IF_MATCH field
when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123");
try {
- zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME,
segmentMetadata, null, null, true, httpHeaders,
- "otherDownloadUrl", false, null, true, 10);
+ zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME,
segmentMetadata, null, null, "otherDownloadUrl",
+ "otherCrypter", 10, true, true, httpHeaders);
fail();
} catch (Exception e) {
// Expected
@@ -131,10 +129,11 @@ public class ZKOperatorTest {
// downloadURL and crypter
when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345");
when(segmentMetadata.getIndexCreationTime()).thenReturn(456L);
- zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata,
null, null, true, httpHeaders,
- "otherDownloadUrl", false, "otherCrypter", true, 10);
+ zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata,
null, null, "otherDownloadUrl",
+ "otherCrypter", 10, true, true, httpHeaders);
segmentZKMetadata =
ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME,
SEGMENT_NAME);
+ assertNotNull(segmentZKMetadata);
assertEquals(segmentZKMetadata.getCrc(), 12345L);
// Push time should not change
assertEquals(segmentZKMetadata.getPushTime(), pushTime);
@@ -152,13 +151,12 @@ public class ZKOperatorTest {
when(segmentMetadata.getCrc()).thenReturn("23456");
when(segmentMetadata.getIndexCreationTime()).thenReturn(789L);
// Add a tiny sleep to guarantee that refresh time is different from the
previous round
- // 1 second delay to avoid "org.apache.helix.HelixException: Specified
EXTERNALVIEW operatorTestTable_OFFLINE is
- // not found!" exception from being thrown sporadically.
- Thread.sleep(1000L);
- zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata,
null, null, true, httpHeaders,
- "otherDownloadUrl", false, "otherCrypter", true, 10);
+ Thread.sleep(10);
+ zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata,
null, null, "otherDownloadUrl",
+ "otherCrypter", 100, true, true, httpHeaders);
segmentZKMetadata =
ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME,
SEGMENT_NAME);
+ assertNotNull(segmentZKMetadata);
assertEquals(segmentZKMetadata.getCrc(), 23456L);
// Push time should not change
assertEquals(segmentZKMetadata.getPushTime(), pushTime);
@@ -167,7 +165,7 @@ public class ZKOperatorTest {
assertTrue(segmentZKMetadata.getRefreshTime() > refreshTime);
assertEquals(segmentZKMetadata.getDownloadUrl(), "otherDownloadUrl");
assertEquals(segmentZKMetadata.getCrypterName(), "otherCrypter");
- assertEquals(segmentZKMetadata.getSizeInBytes(), 10);
+ assertEquals(segmentZKMetadata.getSizeInBytes(), 100);
}
@AfterClass
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index f52cc2d924..2be71ffd25 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -180,7 +180,7 @@ public class SegmentCompletionTest {
_segmentCompletionMgr._seconds += 5;
params = new
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
- .withSegmentName(_segmentNameStr);
+ .withSegmentName(_segmentNameStr).withSegmentLocation("location");
response = _segmentCompletionMgr
.segmentCommitEnd(params, true, false,
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
@@ -257,7 +257,7 @@ public class SegmentCompletionTest {
_segmentCompletionMgr._seconds += 5;
params = new
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
- .withSegmentName(_segmentNameStr);
+ .withSegmentName(_segmentNameStr).withSegmentLocation("location");
response = _segmentCompletionMgr
.segmentCommitEnd(params, true, false,
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
@@ -614,7 +614,7 @@ public class SegmentCompletionTest {
_segmentCompletionMgr._seconds += 5;
params = new
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
- .withSegmentName(_segmentNameStr);
+ .withSegmentName(_segmentNameStr).withSegmentLocation("location");
response = _segmentCompletionMgr
.segmentCommitEnd(params, true, false,
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
@@ -690,7 +690,7 @@ public class SegmentCompletionTest {
_segmentCompletionMgr._seconds += 5;
params = new
Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString())
- .withSegmentName(_segmentNameStr);
+ .withSegmentName(_segmentNameStr).withSegmentLocation("location");
response = _segmentCompletionMgr
.segmentCommitEnd(params, true, false,
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
@@ -977,7 +977,7 @@ public class SegmentCompletionTest {
// Commit in 15s
_segmentCompletionMgr._seconds += 15;
params = new
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
- .withSegmentName(_segmentNameStr);
+ .withSegmentName(_segmentNameStr).withSegmentLocation("location");
response = _segmentCompletionMgr.segmentCommitStart(params);
Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
long commitTimeMs = (_segmentCompletionMgr._seconds - startTime) * 1000;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]