This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new da98e8b Add revertSegmentReplacement API (#7662)
da98e8b is described below
commit da98e8b117b0f992ca5bceb084dc7e60321a0164
Author: Seunghyun Lee <[email protected]>
AuthorDate: Mon Nov 1 17:14:40 2021 -0700
Add revertSegmentReplacement API (#7662)
---
.../pinot/common/lineage/LineageEntryState.java | 2 +-
.../PinotSegmentUploadDownloadRestletResource.java | 50 ++++++++++--
.../helix/core/PinotHelixResourceManager.java | 92 ++++++++++++++++++++--
.../helix/core/retention/RetentionManager.java | 29 +++----
4 files changed, 146 insertions(+), 27 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntryState.java
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntryState.java
index 74407c0..0d5bcd7 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntryState.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntryState.java
@@ -22,5 +22,5 @@ package org.apache.pinot.common.lineage;
* Enum for represent the state of lineage entry
*/
public enum LineageEntryState {
- IN_PROGRESS, COMPLETED
+ IN_PROGRESS, COMPLETED, REVERTED
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index f2c6c45..78de2fd 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -544,11 +544,15 @@ public class PinotSegmentUploadDownloadRestletResource {
@ApiOperation(value = "Start to replace segments", notes = "Start to replace
segments")
public Response startReplaceSegments(
@ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
- @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String
tableTypeStr,
+ @ApiParam(value = "OFFLINE|REALTIME", required = true)
@QueryParam("type") String tableTypeStr,
StartReplaceSegmentsRequest startReplaceSegmentsRequest) {
try {
- String tableNameWithType =
-
TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+ TableType tableType = Constants.validateTableType(tableTypeStr);
+ if (tableType == null) {
+ throw new ControllerApplicationException(LOGGER, "Table type should
either be offline or realtime",
+ Response.Status.BAD_REQUEST);
+ }
+ String tableNameWithType =
TableNameBuilder.forType(tableType).tableNameWithType(tableName);
String segmentLineageEntryId = _pinotHelixResourceManager
.startReplaceSegments(tableNameWithType,
startReplaceSegmentsRequest.getSegmentsFrom(),
startReplaceSegmentsRequest.getSegmentsTo());
@@ -565,12 +569,16 @@ public class PinotSegmentUploadDownloadRestletResource {
@ApiOperation(value = "End to replace segments", notes = "End to replace
segments")
public Response endReplaceSegments(
@ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
- @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String
tableTypeStr,
- @ApiParam(value = "Segment lineage entry id returned by
startReplaceSegments API")
+ @ApiParam(value = "OFFLINE|REALTIME", required = true)
@QueryParam("type") String tableTypeStr,
+ @ApiParam(value = "Segment lineage entry id returned by
startReplaceSegments API", required = true)
@QueryParam("segmentLineageEntryId") String segmentLineageEntryId) {
try {
- String tableNameWithType =
-
TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+ TableType tableType = Constants.validateTableType(tableTypeStr);
+ if (tableType == null) {
+ throw new ControllerApplicationException(LOGGER, "Table type should
either be offline or realtime",
+ Response.Status.BAD_REQUEST);
+ }
+ String tableNameWithType =
TableNameBuilder.forType(tableType).tableNameWithType(tableName);
// Check that the segment lineage entry id is valid
Preconditions.checkNotNull(segmentLineageEntryId,
"'segmentLineageEntryId' should not be null");
_pinotHelixResourceManager.endReplaceSegments(tableNameWithType,
segmentLineageEntryId);
@@ -580,6 +588,34 @@ public class PinotSegmentUploadDownloadRestletResource {
}
}
+ @POST
+ @Path("segments/{tableName}/revertReplaceSegments")
+ @Authenticate(AccessType.UPDATE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Revert segments replacement", notes = "Revert
segments replacement")
+ public Response revertReplaceSegments(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
+ @ApiParam(value = "OFFLINE|REALTIME", required = true)
@QueryParam("type") String tableTypeStr,
+ @ApiParam(value = "Segment lineage entry id to revert", required = true)
+ @QueryParam("segmentLineageEntryId") String segmentLineageEntryId,
+ @ApiParam(value = "Force revert in case the user knows that the lineage
entry is interrupted")
+ @QueryParam("forceRevert") @DefaultValue("false") boolean forceRevert) {
+ try {
+ TableType tableType = Constants.validateTableType(tableTypeStr);
+ if (tableType == null) {
+ throw new ControllerApplicationException(LOGGER, "Table type should
either be offline or realtime",
+ Response.Status.BAD_REQUEST);
+ }
+ String tableNameWithType =
TableNameBuilder.forType(tableType).tableNameWithType(tableName);
+ // Check that the segment lineage entry id is valid
+ Preconditions.checkNotNull(segmentLineageEntryId,
"'segmentLineageEntryId' should not be null");
+ _pinotHelixResourceManager.revertReplaceSegments(tableNameWithType,
segmentLineageEntryId, forceRevert);
+ return Response.ok().build();
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
private File createSegmentFileFromMultipart(FormDataMultiPart multiPart,
File dstFile)
throws IOException {
// Read segment file or segment metadata file and directly use that
information to update zk
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 38ad8b9..71d3ede 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2787,11 +2787,15 @@ public class PinotHelixResourceManager {
for (String entryId : segmentLineage.getLineageEntryIds()) {
LineageEntry lineageEntry = segmentLineage.getLineageEntry(entryId);
- // Check that any segment from 'segmentsFrom' does not appear twice.
-
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(),
segmentsFrom), String.format(
- "It is not allowed to merge segments that are already merged.
(tableName = %s, segmentsFrom from "
- + "existing lineage entry = %s, requested segmentsFrom =
%s)", tableNameWithType,
- lineageEntry.getSegmentsFrom(), segmentsFrom));
+ // If segment entry is in 'REVERTED' state, no need to check for
'segmentsFrom'.
+ if (lineageEntry.getState() != LineageEntryState.REVERTED) {
+ // Check that any segment from 'segmentsFrom' does not appear
twice.
+
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(),
segmentsFrom), String
+ .format(
+ "It is not allowed to merge segments that are already
merged. (tableName = %s, segmentsFrom from "
+ + "existing lineage entry = %s, requested segmentsFrom
= %s)", tableNameWithType,
+ lineageEntry.getSegmentsFrom(), segmentsFrom));
+ }
// Check that merged segments name cannot be the same.
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(),
segmentsTo), String.format(
@@ -2907,6 +2911,84 @@ public class PinotHelixResourceManager {
tableNameWithType, segmentLineageEntryId);
}
+ /**
+ * Revert the segment replacement
+ *
+ * 1. Compute validation
+ * 2. Update the lineage entry state to "REVERTED" and write metadata to the
property store
+ *
+ * Update is done with retry logic along with read-modify-write block for
achieving atomic update of the lineage
+ * metadata.
+ *
+ * @param tableNameWithType
+ * @param segmentLineageEntryId
+ */
+ public void revertReplaceSegments(String tableNameWithType, String
segmentLineageEntryId, boolean forceRevert) {
+ try {
+ DEFAULT_RETRY_POLICY.attempt(() -> {
+ // Fetch the segment lineage metadata
+ ZNRecord segmentLineageZNRecord =
+
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore,
tableNameWithType);
+ Preconditions.checkArgument(segmentLineageZNRecord != null, String
+ .format("Segment lineage does not exist. (tableNameWithType =
'%s', segmentLineageEntryId = '%s')",
+ tableNameWithType, segmentLineageEntryId));
+ SegmentLineage segmentLineage =
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+ int expectedVersion = segmentLineageZNRecord.getVersion();
+
+ // Look up the lineage entry based on the segment lineage entry id
+ LineageEntry lineageEntry =
segmentLineage.getLineageEntry(segmentLineageEntryId);
+ Preconditions.checkArgument(lineageEntry != null, String
+ .format("Invalid segment lineage entry id (tableName='%s',
segmentLineageEntryId='%s')", tableNameWithType,
+ segmentLineageEntryId));
+
+ if (lineageEntry.getState() != LineageEntryState.COMPLETED) {
+ // We do not allow to revert the lineage entry with 'REVERTED'
state. For 'IN_PROGRESS", we only allow to
+ // revert when 'forceRevert' is set to true.
+ if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS ||
!forceRevert) {
+ LOGGER.warn("Lineage state is not valid. Cannot revert the lineage
entry. (tableNameWithType={}, "
+ + "segmentLineageEntryId={}, segmentLineageEntrySate={},
forceRevert={})", tableNameWithType,
+ segmentLineageEntryId, lineageEntry.getState(), forceRevert);
+ return false;
+ }
+ }
+
+ // Check that all segments from 'segmentsFrom' are in ONLINE state in
the external view.
+ Set<String> onlineSegments =
getOnlineSegmentsFromExternalView(tableNameWithType);
+
Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()),
String.format(
+ "Not all segments from 'segmentFrom' are in ONLINE state in the
external view. (tableName = '%s', "
+ + "segmentsFrom = '%s', onlineSegments = '%s'",
tableNameWithType, lineageEntry.getSegmentsFrom(),
+ onlineSegments));
+
+ // Update lineage entry
+ LineageEntry newLineageEntry =
+ new LineageEntry(lineageEntry.getSegmentsFrom(),
lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED,
+ System.currentTimeMillis());
+ segmentLineage.updateLineageEntry(segmentLineageEntryId,
newLineageEntry);
+
+ // Write back to the lineage entry
+ if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore,
segmentLineage, expectedVersion)) {
+ // If the segment lineage metadata is successfully updated, we need
to trigger brokers to rebuild the
+ // routing table because it is possible that there has been no EV
change but the routing result may be
+ // different after updating the lineage entry.
+ sendRoutingTableRebuildMessage(tableNameWithType);
+ return true;
+ } else {
+ return false;
+ }
+ });
+ } catch (Exception e) {
+ String errorMsg = String
+ .format("Failed to update the segment lineage. (tableName = %s,
segmentLineageEntryId = %s)",
+ tableNameWithType, segmentLineageEntryId);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(errorMsg, e);
+ }
+
+ // Only successful attempt can reach here
+ LOGGER.info("revertReplaceSegments is successfully processed.
(tableNameWithType = {}, segmentLineageEntryId = {})",
+ tableNameWithType, segmentLineageEntryId);
+ }
+
private void waitForSegmentsBecomeOnline(String tableNameWithType,
Set<String> segmentsToCheck)
throws InterruptedException, TimeoutException {
long endTimeMs = System.currentTimeMillis() +
EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 67d2d47..2512c78 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -229,20 +229,21 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
// If the lineage state is 'COMPLETED', it is safe to delete all
segments from 'segmentsFrom'
segmentsToDelete.addAll(sourceSegments);
}
- } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS)
{
- // If the lineage state is 'IN_PROGRESS', we need to clean up the
zombie lineage entry and its segments
- if (lineageEntry.getTimestamp() < System.currentTimeMillis() -
LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS) {
- Set<String> destinationSegments = new
HashSet<>(lineageEntry.getSegmentsTo());
- destinationSegments.retainAll(segmentsForTable);
- if (destinationSegments.isEmpty()) {
- // If the lineage state is 'IN_PROGRESS' and source segments
are already removed, it is safe to clean up
- // the lineage entry. Deleting lineage will allow the task
scheduler to re-schedule the source segments
- // to be merged again.
- segmentLineage.deleteLineageEntry(lineageEntryId);
- } else {
- // If the lineage state is 'IN_PROGRESS', it is safe to delete
all segments from 'segmentsTo'
- segmentsToDelete.addAll(destinationSegments);
- }
+ } else if (lineageEntry.getState() == LineageEntryState.REVERTED || (
+ lineageEntry.getState() == LineageEntryState.IN_PROGRESS &&
lineageEntry.getTimestamp()
+ < System.currentTimeMillis() -
LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS)) {
+ // If the lineage state is 'IN_PROGRESS' or 'REVERTED', we need to
clean up the zombie lineage
+ // entry and its segments
+ Set<String> destinationSegments = new
HashSet<>(lineageEntry.getSegmentsTo());
+ destinationSegments.retainAll(segmentsForTable);
+ if (destinationSegments.isEmpty()) {
+ // If the lineage state is 'IN_PROGRESS or REVERTED' and source
segments are already removed, it is safe
+ // to clean up the lineage entry. Deleting lineage will allow
the task scheduler to re-schedule the source
+ // segments to be merged again.
+ segmentLineage.deleteLineageEntry(lineageEntryId);
+ } else {
+ // If the lineage state is 'IN_PROGRESS', it is safe to delete
all segments from 'segmentsTo'
+ segmentsToDelete.addAll(destinationSegments);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]