mcvsubbu commented on a change in pull request #4713: Refactoring realtime
segment committer
URL: https://github.com/apache/incubator-pinot/pull/4713#discussion_r348759987
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -761,74 +763,15 @@ protected SegmentBuildDescriptor
buildSegmentInternal(boolean forCommit) {
}
}
- private SegmentCompletionProtocol.Response
doSplitCommit(SegmentCompletionProtocol.Response prevResponse) {
- final File segmentTarFile = new
File(_segmentBuildDescriptor.getSegmentTarFilePath());
- SegmentCompletionProtocol.Request.Params params = new
SegmentCompletionProtocol.Request.Params();
-
-
params.withSegmentName(_segmentNameStr).withOffset(_currentOffset).withNumRows(_numRowsConsumed)
-
.withInstanceId(_instanceId).withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
- .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
- .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());
- if (_isOffHeap) {
- params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
- }
- SegmentCompletionProtocol.Response segmentCommitStartResponse =
_protocolHandler.segmentCommitStart(params);
- if (!segmentCommitStartResponse.getStatus()
-
.equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE)) {
- segmentLogger.warn("CommitStart failed with response {}",
segmentCommitStartResponse.toJsonString());
- return SegmentCompletionProtocol.RESP_FAILED;
- }
-
- params = new SegmentCompletionProtocol.Request.Params();
-
params.withOffset(_currentOffset).withSegmentName(_segmentNameStr).withInstanceId(_instanceId);
- SegmentCompletionProtocol.Response segmentCommitUploadResponse =
- _protocolHandler.segmentCommitUpload(params, segmentTarFile,
prevResponse.getControllerVipUrl());
- if (!segmentCommitUploadResponse.getStatus()
-
.equals(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS)) {
- segmentLogger.warn("Segment upload failed with response {}",
segmentCommitUploadResponse.toJsonString());
- return SegmentCompletionProtocol.RESP_FAILED;
- }
-
- params = new SegmentCompletionProtocol.Request.Params();
-
params.withInstanceId(_instanceId).withOffset(_currentOffset).withSegmentName(_segmentNameStr)
-
.withSegmentLocation(segmentCommitUploadResponse.getSegmentLocation()).withNumRows(_numRowsConsumed)
- .withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
- .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
- .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());
- if (_isOffHeap) {
- params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
- }
- SegmentCompletionProtocol.Response commitEndResponse;
- if (_indexLoadingConfig.isEnableSplitCommitEndWithMetadata()) {
- commitEndResponse =
- _protocolHandler.segmentCommitEndWithMetadata(params,
_segmentBuildDescriptor.getMetadataFiles());
- } else {
- commitEndResponse = _protocolHandler.segmentCommitEnd(params);
- }
-
- if
(!commitEndResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS))
{
- segmentLogger.warn("CommitEnd failed with response {}",
commitEndResponse.toJsonString());
- return SegmentCompletionProtocol.RESP_FAILED;
- }
- return commitEndResponse;
- }
-
protected boolean commitSegment(SegmentCompletionProtocol.Response response)
{
final String segTarFileName =
_segmentBuildDescriptor.getSegmentTarFilePath();
File segTarFile = new File(segTarFileName);
if (!segTarFile.exists()) {
throw new RuntimeException("Segment file does not exist:" +
segTarFileName);
}
SegmentCompletionProtocol.Response returnedResponse;
- if (response.isSplitCommit() && _indexLoadingConfig.isEnableSplitCommit())
{
- // Send segmentStart, segmentUpload, & segmentCommitEnd to the controller
- // if that succeeds, swap in-memory segment with the one built.
- returnedResponse = doSplitCommit(response);
- } else {
- // Send segmentCommit() to the controller
- // if that succeeds, swap in-memory segment with the one built.
- returnedResponse = postSegmentCommitMsg();
- }
+
+ returnedResponse = commit(response);
Review comment:
Please change this to declare and assign returnedResponse in one line. Can
you rename this var to commitResponse?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]