This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a209f9b0b1 Adding a new list in the propertystore to capture the
committing segments. (#15016)
a209f9b0b1 is described below
commit a209f9b0b16c38e2c1559944b86af6a56ba3a70a
Author: 9aman <[email protected]>
AuthorDate: Wed Feb 12 15:21:32 2025 +0530
Adding a new list in the propertystore to capture the committing segments.
(#15016)
* Adding a new list in the propertystore to capture the committing segments.
This will make it quicker to fetch the committing segments without going
through each segment ZK metadata
* Minor improvements and fixing checkstyle violations
* Adding debufInfo API to fetch committing segments
* Add support for adding missing committing segments due to failure in
adding them during commit start protocol
* Running addition and deletion of segments from the list in the same ZK
call. This aims to simplify the code
* Improving logs for debuggability
---
.../pinot/common/metadata/ZKMetadataProvider.java | 5 +
.../api/resources/PinotRealtimeTableResource.java | 59 ++++++
.../realtime/PinotLLCRealtimeSegmentManager.java | 209 ++++++++++++++++++++-
.../RealtimeSegmentValidationManager.java | 23 +++
.../PinotLLCRealtimeSegmentManagerTest.java | 171 +++++++++++++++++
5 files changed, 460 insertions(+), 7 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 0729533871..37ba1499e3 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -66,6 +66,7 @@ public class ZKMetadataProvider {
private static final String CLUSTER_APPLICATION_QUOTAS = "applicationQuotas";
private static final String PROPERTYSTORE_CONTROLLER_JOBS_PREFIX =
"/CONTROLLER_JOBS";
private static final String PROPERTYSTORE_SEGMENTS_PREFIX = "/SEGMENTS";
+ private static final String PROPERTYSTORE_PAUSELESS_DEBUG_METADATA_PREFIX =
"/PAUSELESS_DEBUG_METADATA";
private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
private static final String PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX =
"/INSTANCE_PARTITIONS";
private static final String PROPERTYSTORE_DATABASE_CONFIGS_PREFIX =
"/CONFIGS/DATABASE";
@@ -246,6 +247,10 @@ public class ZKMetadataProvider {
return StringUtil.join("/", PROPERTYSTORE_SEGMENTS_PREFIX, resourceName,
segmentName);
}
+ public static String
constructPropertyStorePathForPauselessDebugMetadata(String resourceName) {
+ return StringUtil.join("/", PROPERTYSTORE_PAUSELESS_DEBUG_METADATA_PREFIX,
resourceName);
+ }
+
public static String constructPropertyStorePathForSchema(String schemaName) {
return StringUtil.join("/", PROPERTYSTORE_SCHEMAS_PREFIX, schemaName);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index e69de66acb..ab9cfe3035 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.api.resources;
import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
@@ -29,6 +30,7 @@ import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -46,6 +48,7 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.utils.DatabaseUtils;
@@ -318,6 +321,62 @@ public class PinotRealtimeTableResource {
}
}
+ @GET
+ @Path("/tables/{tableName}/pauselessDebugInfo")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_DEBUG_INFO)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Returns state of pauseless table", notes =
+ "Gets the segments that are in error state and segments with COMMITTING
status in ZK metadata")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 404, message = "Table not found"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ public String getPauslessTableDebugInfo(
+ @ApiParam(value = "Realtime table name with or without type", required =
true, example = "myTable | "
+ + "myTable_REALTIME") @PathParam("tableName") String
realtimeTableName,
+ @Context HttpHeaders headers) {
+ realtimeTableName = DatabaseUtils.translateTableName(realtimeTableName,
headers);
+ try {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+ if (TableType.OFFLINE == tableType) {
+ throw new IllegalStateException("Cannot get consuming segments info
for OFFLINE table: " + realtimeTableName);
+ }
+
+ String tableNameWithType =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName);
+
+ Map<String, Object> result = new HashMap<>();
+
+ result.put("instanceToErrorSegmentsMap",
getInstanceToErrorSegmentsMap(tableNameWithType));
+
+ result.put("committingSegments",
_pinotLLCRealtimeSegmentManager.getCommittingSegments(tableNameWithType));
+
+ return JsonUtils.objectToPrettyString(result);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to get pauseless debug info for table %s. %s",
realtimeTableName, e.getMessage()),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+ private Map<String, Set<String>> getInstanceToErrorSegmentsMap(String
tableNameWithType) {
+ ExternalView externalView =
_pinotHelixResourceManager.getTableExternalView(tableNameWithType);
+ Preconditions.checkState(externalView != null, "External view does not
exist for table: " + tableNameWithType);
+
+ Map<String, Set<String>> instanceToErrorSegmentsMap = new HashMap<>();
+
+ for (String segmentName : externalView.getPartitionSet()) {
+ Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
+ for (String instance : externalViewStateMap.keySet()) {
+ if (CommonConstants.Helix.StateModel.SegmentStateModel.ERROR.equals(
+ externalViewStateMap.get(instance))) {
+ instanceToErrorSegmentsMap.computeIfAbsent(instance, unused -> new
HashSet<>()).add(segmentName);
+ }
+ }
+ }
+ return instanceToErrorSegmentsMap;
+ }
+
private void validateTable(String tableNameWithType) {
IdealState idealState =
_pinotHelixResourceManager.getTableIdealState(tableNameWithType);
if (idealState == null) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e296136975..322401e649 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -36,10 +36,12 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -57,6 +59,7 @@ import org.apache.helix.InstanceType;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.messages.ForceCommitMessage;
@@ -124,6 +127,7 @@ import
org.apache.pinot.spi.utils.retry.AttemptFailureException;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.apache.zookeeper.data.Stat;
+import org.codehaus.commons.nullanalysis.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -152,6 +156,8 @@ public class PinotLLCRealtimeSegmentManager {
public static final String PAUSE_STATE = "pauseState";
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotLLCRealtimeSegmentManager.class);
+ private static final RetryPolicy DEFAULT_RETRY_POLICY =
RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f);
+ public static final String COMMITTING_SEGMENTS = "committingSegments";
private static final int STARTING_SEQUENCE_NUMBER = 0; // Initial sequence
number for new table segments
private static final String METADATA_EVENT_NOTIFIER_PREFIX =
"metadata.event.notifier";
@@ -437,6 +443,66 @@ public class PinotLLCRealtimeSegmentManager {
}
}
+ private boolean addSegmentToCommittingSegmentsList(String realtimeTableName,
String segmentName) {
+ String committingSegmentsListPath =
+
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
+ Stat stat = new Stat();
+ ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat,
AccessOption.PERSISTENT);
+ int expectedVersion = stat.getVersion();
+ LOGGER.info("Committing segments list size: {} before adding the segment:
{}", Optional.ofNullable(znRecord)
+ .map(record -> record.getListField(COMMITTING_SEGMENTS))
+ .map(List::size)
+ .orElse(0), segmentName);
+
+ // empty ZN record for the table
+ if (znRecord == null) {
+ znRecord = new ZNRecord(realtimeTableName);
+ znRecord.setListField(COMMITTING_SEGMENTS, List.of(segmentName));
+ return _propertyStore.create(committingSegmentsListPath, znRecord,
AccessOption.PERSISTENT);
+ }
+
+ // segment already present in the list
+ List<String> committingSegmentList =
znRecord.getListField(COMMITTING_SEGMENTS);
+ if (committingSegmentList != null &&
committingSegmentList.contains(segmentName)) {
+ return true;
+ }
+
+ if (committingSegmentList == null) {
+ committingSegmentList = List.of(segmentName);
+ } else {
+ committingSegmentList.add(segmentName);
+ }
+ znRecord.setListField(COMMITTING_SEGMENTS, committingSegmentList);
+ try {
+ return _propertyStore.set(committingSegmentsListPath, znRecord,
expectedVersion, AccessOption.PERSISTENT);
+ } catch (ZkBadVersionException e) {
+ return false;
+ }
+ }
+
+ private boolean removeSegmentFromCommittingSegmentsList(String
realtimeTableName, String segmentName) {
+ String committingSegmentsListPath =
+
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
+ Stat stat = new Stat();
+ ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat,
AccessOption.PERSISTENT);
+
+ LOGGER.info("Committing segments list size: {} before removing the
segment: {}", Optional.ofNullable(znRecord)
+ .map(record -> record.getListField(COMMITTING_SEGMENTS))
+ .map(List::size)
+ .orElse(0), segmentName);
+
+ if (znRecord == null || znRecord.getListField(COMMITTING_SEGMENTS) == null
|| !znRecord.getListField(
+ COMMITTING_SEGMENTS).contains(segmentName)) {
+ return true;
+ }
+ znRecord.getListField(COMMITTING_SEGMENTS).remove(segmentName);
+ try {
+ return _propertyStore.set(committingSegmentsListPath, znRecord,
stat.getVersion(), AccessOption.PERSISTENT);
+ } catch (ZkBadVersionException e) {
+ return false;
+ }
+ }
+
public IdealState getIdealState(String realtimeTableName) {
try {
IdealState idealState = HelixHelper.getTableIdealState(_helixManager,
realtimeTableName);
@@ -611,6 +677,18 @@ public class PinotLLCRealtimeSegmentManager {
// No-op
}
+ private boolean updateCommittingSegmentsList(String realtimeTableName,
+ Callable<Boolean> operation) {
+ try {
+ DEFAULT_RETRY_POLICY.attempt(operation);
+ } catch (Exception e) {
+ _controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
+ LOGGER.error("Failed to update committing segments list for table: {}",
realtimeTableName, e);
+ return false;
+ }
+ return true;
+ }
+
// Step 1: Update committing segment metadata
private SegmentZKMetadata updateCommittingSegmentMetadata(String
realtimeTableName,
CommittingSegmentDescriptor committingSegmentDescriptor, boolean
isStartMetadata) {
@@ -681,9 +759,10 @@ public class PinotLLCRealtimeSegmentManager {
* the response is sent to the server to build the segment.
* <p>
* This method performs the following actions:
- * 1. Updates the property store segment metadata status from IN_PROGRESS to
COMMITTING.
- * 2. Creates a new property store record for the next consuming segment.
- * 3. Updates the ideal state to mark the new segment as CONSUMING.
+ * 1. Adds the segment to the committing segment list
+ * 2. Updates the property store segment metadata status from IN_PROGRESS to
COMMITTING.
+ * 3. Creates a new property store record for the next consuming segment.
+ * 4. Updates the ideal state to mark the new segment as CONSUMING.
*/
public void commitSegmentStartMetadata(String realtimeTableName,
CommittingSegmentDescriptor committingSegmentDescriptor) {
@@ -693,6 +772,13 @@ public class PinotLLCRealtimeSegmentManager {
try {
_numCompletingSegments.addAndGet(1);
+ LOGGER.info("Adding segment: {} to committing segment list",
+ committingSegmentDescriptor.getSegmentName());
+ if (!updateCommittingSegmentsList(realtimeTableName,
+ () -> addSegmentToCommittingSegmentsList(realtimeTableName,
committingSegmentDescriptor.getSegmentName()))) {
+ LOGGER.error("Failed to update committing segments list for table: {},
segment: {}", realtimeTableName,
+ committingSegmentDescriptor.getSegmentName());
+ }
commitSegmentMetadataInternal(realtimeTableName,
committingSegmentDescriptor, true);
} finally {
_numCompletingSegments.addAndGet(-1);
@@ -701,7 +787,8 @@ public class PinotLLCRealtimeSegmentManager {
/**
* Invoked after the realtime segment has been built and uploaded during
pauseless ingestion.
- * Updates the metadata like CRC, download URL, etc. in the Zookeeper
metadata for the committing segment.
+ * Updates the metadata like CRC, download URL, etc. in the Zookeeper
metadata for the committing segment
+ * and removes the segment from the committing segment list.
*/
public void commitSegmentEndMetadata(String realtimeTableName,
CommittingSegmentDescriptor committingSegmentDescriptor) {
@@ -739,6 +826,14 @@ public class PinotLLCRealtimeSegmentManager {
LOGGER.info("Updating segment ZK metadata for segment: {}",
committingSegmentName);
updateCommittingSegmentMetadata(realtimeTableName,
committingSegmentDescriptor, false);
LOGGER.info("Successfully updated segment metadata for segment: {}",
committingSegmentName);
+ // remove the segment from the committing segment list
+ LOGGER.info("Removing segment: {} from committing segment list",
+ committingSegmentDescriptor.getSegmentName());
+ if (!updateCommittingSegmentsList(realtimeTableName,
+ () -> removeSegmentFromCommittingSegmentsList(realtimeTableName,
committingSegmentName))) {
+ LOGGER.error("Failed to update committing segments list for table: {},
segment: {}", realtimeTableName,
+ committingSegmentDescriptor.getSegmentName());
+ }
} finally {
_numCompletingSegments.addAndGet(-1);
}
@@ -968,7 +1063,7 @@ public class PinotLLCRealtimeSegmentManager {
instanceName);
}
return idealState;
- }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true);
+ }, DEFAULT_RETRY_POLICY, true);
} catch (Exception e) {
_controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
throw e;
@@ -1086,7 +1181,7 @@ public class PinotLLCRealtimeSegmentManager {
realtimeTableName, isTableEnabled, isTablePaused);
return idealState;
}
- }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
+ }, DEFAULT_RETRY_POLICY, true);
}
/**
@@ -1116,7 +1211,7 @@ public class PinotLLCRealtimeSegmentManager {
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
committingSegmentName,
isTablePaused(idealState) ? null : newSegmentName,
segmentAssignment, instancePartitionsMap);
return idealState;
- }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
+ }, DEFAULT_RETRY_POLICY);
}
public static boolean isTablePaused(IdealState idealState) {
@@ -2228,4 +2323,104 @@ public class PinotLLCRealtimeSegmentManager {
}
return segmentsYetToBeCommitted;
}
+
+ /**
+ * Synchronizes the list of committing segments for a realtime table in
ZooKeeper by both adding new segments
+ * and removing segments that are no longer in COMMITTING state. This
function is designed to be called periodically
+ * to maintain an up-to-date list of actively committing segments.
+ *
+ * The synchronization process works as follows:
+ * 1. For a new table (no existing ZooKeeper record), creates a fresh list
with the provided segments
+ * 2. For an existing table, merges the new segments with currently
committing segments while removing any
+ * segments that are no longer in COMMITTING state
+ * 3. Maintains uniqueness of segments using a Set-based deduplication
+ *
+ * @param realtimeTableName Name of the realtime table whose committing
segments list needs to be synchronized
+ * @param committingSegments List of new segment names that are currently in
COMMITTING state.
+ * If null, returns true without making any changes
to the existing list
+ * @return true if the synchronization succeeds, false if there's a failure
in updating ZooKeeper
+ * @see #getCommittingSegments for the logic that filters out segments no
longer in COMMITTING state
+ */
+ public boolean syncCommittingSegments(String realtimeTableName, @NotNull
List<String> committingSegments) {
+ return updateCommittingSegmentsList(realtimeTableName, () -> {
+ String committingSegmentsListPath =
+
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
+
+ // Fetch the committing segments record from the property store.
+ Stat stat = new Stat();
+ ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat,
AccessOption.PERSISTENT);
+
+ // empty ZN record for the table
+ if (znRecord == null) {
+ znRecord = new ZNRecord(realtimeTableName);
+ znRecord.setListField(COMMITTING_SEGMENTS, committingSegments);
+ return _propertyStore.create(committingSegmentsListPath, znRecord,
AccessOption.PERSISTENT);
+ }
+
+ Set<String> mergedSegments = new HashSet<>(committingSegments);
+ // Get segments that are present in the list and are still in COMMITTING
status
+ List<String> existingSegments =
+ getCommittingSegments(realtimeTableName,
znRecord.getListField(COMMITTING_SEGMENTS));
+ if (existingSegments != null) {
+ mergedSegments.addAll(existingSegments);
+ }
+
+ znRecord.setListField(COMMITTING_SEGMENTS, new
ArrayList<>(mergedSegments));
+ return _propertyStore.set(committingSegmentsListPath, znRecord,
stat.getVersion(), AccessOption.PERSISTENT);
+ });
+ }
+
+ /**
+ * Filters and returns a list of committing segments for a realtime table.
+ * This method excludes segments that are either:
+ * 1. Missing from ZK metadata (likely deleted)
+ * 2. Already committed (status: DONE)
+ *
+ * @param realtimeTableName The name of the realtime table
+ * @param committingSegmentsFromPropertyStore List of segments from property
store, can be null
+ * @return Filtered list of committing segments, or null if input is null
+ */
+ @Nullable
+ private List<String> getCommittingSegments(String realtimeTableName,
+ @Nullable List<String> committingSegmentsFromPropertyStore) {
+
+ if (committingSegmentsFromPropertyStore == null) {
+ return null;
+ }
+
+ List<String> committingSegments = new ArrayList<>();
+ for (String segment : committingSegmentsFromPropertyStore) {
+ SegmentZKMetadata segmentZKMetadata =
_helixResourceManager.getSegmentZKMetadata(realtimeTableName, segment);
+ if (segmentZKMetadata == null ||
Status.DONE.equals(segmentZKMetadata.getStatus())) {
+ continue;
+ }
+ committingSegments.add(segment);
+ }
+ return committingSegments;
+ }
+
+ /**
+ * Retrieves and filters the list of committing segments for a realtime
table from the property store.
+ * This method:
+ * 1. Constructs the ZK path for pauseless debug metadata
+ * 2. Fetches the committing segments record from the property store
+ * 3. Filters out segments that are either deleted or already committed
+ *
+ * @param realtimeTableName The name of the realtime table to fetch
committing segments for
+ * @return Filtered list of committing segments, or null if no committing
segments record exists
+ * or if the COMMITTING_SEGMENTS field is not present in the ZNRecord
+ */
+ public List<String> getCommittingSegments(String realtimeTableName) {
+ String committingSegmentsListPath =
+
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
+
+ // Fetch the committing segments record from the property store.
+ Stat stat = new Stat();
+ ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat,
AccessOption.PERSISTENT);
+ if (znRecord == null || znRecord.getListField(COMMITTING_SEGMENTS) ==
null) {
+ return null;
+ }
+
+ return getCommittingSegments(realtimeTableName,
znRecord.getListField(COMMITTING_SEGMENTS));
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 5ea86bd9fb..db832aede6 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.validation;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -28,6 +29,7 @@ import
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ValidationMetrics;
+import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
@@ -38,6 +40,7 @@ import org.apache.pinot.spi.config.table.PauseState;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
@@ -176,6 +179,12 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
// Update the total document count gauge
_validationMetrics.updateTotalDocumentCountGauge(realtimeTableName,
computeTotalDocumentCount(segmentsZKMetadata));
+ // Ensures all segments in COMMITTING state are properly tracked in
ZooKeeper.
+ // Acts as a recovery mechanism for segments that may have failed to
register during start of commit protocol.
+ if (PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) {
+ syncCommittingSegmentsFromMetadata(realtimeTableName,
segmentsZKMetadata);
+ }
+
// Check missing segments and upload them to the deep store
if (_llcRealtimeSegmentManager.isDeepStoreLLCSegmentUploadRetryEnabled()) {
_llcRealtimeSegmentManager.uploadToDeepStoreIfMissing(tableConfig,
segmentsZKMetadata);
@@ -186,6 +195,20 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
}
}
+ private void syncCommittingSegmentsFromMetadata(String realtimeTableName,
+ List<SegmentZKMetadata> segmentsZKMetadata) {
+ List<String> committingSegments = new ArrayList<>();
+ for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
+ if
(CommonConstants.Segment.Realtime.Status.COMMITTING.equals(segmentZKMetadata.getStatus()))
{
+ committingSegments.add(segmentZKMetadata.getSegmentName());
+ }
+ }
+ LOGGER.info("Adding committing segments to ZK: {}", committingSegments);
+ if (!_llcRealtimeSegmentManager.syncCommittingSegments(realtimeTableName,
committingSegments)) {
+ LOGGER.error("Failed to add committing segments for table: {}",
realtimeTableName);
+ }
+ }
+
@Override
protected void nonLeaderCleanup(List<String> tableNamesWithType) {
for (String tableNameWithType : tableNamesWithType) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index fcbb5c4a68..fd1f9d67c6 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -44,6 +45,7 @@ import java.util.stream.IntStream;
import javax.annotation.Nullable;
import javax.ws.rs.core.Response;
import org.apache.commons.io.FileUtils;
+import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.model.ExternalView;
@@ -53,6 +55,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.restlet.resources.TableLLCSegmentUploadResponse;
@@ -94,6 +97,7 @@ import org.testng.annotations.Test;
import static
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION;
import static
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS;
+import static
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager.COMMITTING_SEGMENTS;
import static
org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@@ -1482,6 +1486,173 @@ public class PinotLLCRealtimeSegmentManagerTest {
assert ImmutableSet.of("s2", "s4", "s5").equals(segmentsYetToBeCommitted);
}
+ @Test
+ public void testGetCommittingSegments()
+ throws HttpErrorStatusException, IOException, URISyntaxException {
+ // mock the behavior for PinotHelixResourceManager
+ PinotHelixResourceManager pinotHelixResourceManager =
mock(PinotHelixResourceManager.class);
+ HelixManager helixManager = mock(HelixManager.class);
+ HelixAdmin helixAdmin = mock(HelixAdmin.class);
+ ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore =
+ (ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class);
+
when(pinotHelixResourceManager.getHelixZkManager()).thenReturn(helixManager);
+ when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+ when(helixManager.getClusterName()).thenReturn(CLUSTER_NAME);
+
when(pinotHelixResourceManager.getPropertyStore()).thenReturn(zkHelixPropertyStore);
+
+ // init fake PinotLLCRealtimeSegmentManager
+ ControllerConf controllerConfig = new ControllerConf();
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ new FakePinotLLCRealtimeSegmentManager(pinotHelixResourceManager,
controllerConfig);
+
+ // Test table name
+ String realtimeTableName = "githubEvents_2_REALTIME";
+
+ // Create test segments
+ List<String> testSegments = List.of(
+ "githubEvents_2__0__0__20250210T1142Z",
+ "githubEvents_2__0__1__20250210T1142Z",
+ "githubEvents_2__0__2__20250210T1142Z",
+ "githubEvents_2__0__3__20250210T1142Z"
+ );
+
+ // mock response of propertyStore
+ String committingSegmentsListPath =
+
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
+
+ ZNRecord znRecord = new ZNRecord(realtimeTableName);
+ znRecord.setListField(COMMITTING_SEGMENTS, testSegments);
+
+ when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(),
eq(AccessOption.PERSISTENT)))
+ .thenReturn(znRecord);
+
+ // mock response for fetching segmentZKMetadata with different scenarios
+ // Segment 0: COMMITTING status
+ SegmentZKMetadata segmentZKMetadata0 = mock(SegmentZKMetadata.class);
+ when(segmentZKMetadata0.getStatus()).thenReturn(Status.COMMITTING);
+ when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName,
testSegments.get(0)))
+ .thenReturn(segmentZKMetadata0);
+
+ // Segment 1: null metadata (deleted)
+ when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName,
testSegments.get(1)))
+ .thenReturn(null);
+
+ // Segment 2: DONE status
+ SegmentZKMetadata segmentZKMetadata2 = mock(SegmentZKMetadata.class);
+ when(segmentZKMetadata2.getStatus()).thenReturn(Status.DONE);
+ when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName,
testSegments.get(2)))
+ .thenReturn(segmentZKMetadata2);
+
+ // Segment 3: COMMITTING status
+ SegmentZKMetadata segmentZKMetadata3 = mock(SegmentZKMetadata.class);
+ when(segmentZKMetadata3.getStatus()).thenReturn(Status.COMMITTING);
+ when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName,
testSegments.get(3)))
+ .thenReturn(segmentZKMetadata3);
+
+ // Execute test
+ List<String> result =
segmentManager.getCommittingSegments(realtimeTableName);
+
+ // Verify results
+ assertNotNull(result);
+ assertEquals(2, result.size());
+ assertTrue(result.contains(testSegments.get(0))); // Should include
COMMITTING segment
+ assertFalse(result.contains(testSegments.get(1))); // Should exclude null
metadata segment
+ assertFalse(result.contains(testSegments.get(2))); // Should exclude DONE
segment
+ assertTrue(result.contains(testSegments.get(3))); // Should include
COMMITTING segment
+
+ // Test null case
+ when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(),
eq(AccessOption.PERSISTENT)))
+ .thenReturn(null);
+ result = segmentManager.getCommittingSegments(realtimeTableName);
+ assertNull(result);
+
+ // Test empty COMMITTING_SEGMENTS field
+ ZNRecord emptyRecord = new ZNRecord("CommittingSegments");
+ when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(),
eq(AccessOption.PERSISTENT)))
+ .thenReturn(emptyRecord);
+ result = segmentManager.getCommittingSegments(realtimeTableName);
+ assertNull(result);
+ }
+
+ @Test
+ public void testSyncCommittingSegments() throws Exception {
+ // Set up mocks for the resource management infrastructure
+ PinotHelixResourceManager pinotHelixResourceManager =
mock(PinotHelixResourceManager.class);
+ HelixManager helixManager = mock(HelixManager.class);
+ HelixAdmin helixAdmin = mock(HelixAdmin.class);
+ ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore =
+ (ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class);
+
+ // Configure basic mock behaviors
+
when(pinotHelixResourceManager.getHelixZkManager()).thenReturn(helixManager);
+ when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+ when(helixManager.getClusterName()).thenReturn(CLUSTER_NAME);
+
when(pinotHelixResourceManager.getPropertyStore()).thenReturn(zkHelixPropertyStore);
+
+ // Initialize the segment manager
+ ControllerConf controllerConfig = new ControllerConf();
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ new FakePinotLLCRealtimeSegmentManager(pinotHelixResourceManager,
controllerConfig);
+
+ String realtimeTableName = "testTable_REALTIME";
+ String committingSegmentsListPath =
+
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
+
+
+ // Create test segments with different states
+ String committingSegment1 = "testTable__0__0__20250210T1142Z";
+ String committingSegment2 = "testTable__0__1__20250210T1142Z";
+ String doneSegment = "testTable__0__2__20250210T1142Z";
+
+ // Set up segment metadata mocks
+ SegmentZKMetadata committingMetadata1 = mock(SegmentZKMetadata.class);
+ when(committingMetadata1.getStatus()).thenReturn(Status.COMMITTING);
+
+ SegmentZKMetadata committingMetadata2 = mock(SegmentZKMetadata.class);
+ when(committingMetadata2.getStatus()).thenReturn(Status.COMMITTING);
+
+ SegmentZKMetadata doneMetadata = mock(SegmentZKMetadata.class);
+ when(doneMetadata.getStatus()).thenReturn(Status.DONE);
+
+ when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName,
committingSegment1)).thenReturn(
+ committingMetadata1);
+ when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName,
committingSegment2)).thenReturn(
+ committingMetadata2);
+ when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName,
doneSegment)).thenReturn(doneMetadata);
+
+ // Test 1: Initial creation with mixed status segments
+ List<String> newSegments = Arrays.asList(committingSegment1,
committingSegment2);
+ when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(),
eq(AccessOption.PERSISTENT)))
+ .thenReturn(null);
+ when(zkHelixPropertyStore.create(eq(committingSegmentsListPath), any(),
eq(AccessOption.PERSISTENT)))
+ .thenReturn(true);
+
+ assertTrue(segmentManager.syncCommittingSegments(realtimeTableName,
newSegments));
+
+ // Test 2: Syncing with existing segments including DONE and missing
metadata
+ ZNRecord existingRecord = new ZNRecord(realtimeTableName);
+ existingRecord.setListField(COMMITTING_SEGMENTS,
+ Arrays.asList(committingSegment2, doneSegment));
+
+ when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(),
eq(AccessOption.PERSISTENT)))
+ .thenReturn(existingRecord);
+ when(zkHelixPropertyStore.set(eq(committingSegmentsListPath), any(),
anyInt(), eq(AccessOption.PERSISTENT)))
+ .thenReturn(true);
+
+ // There should not be any duplicates and the doneSegment should be
removed from the list
+ assertTrue(segmentManager.syncCommittingSegments(realtimeTableName,
+ Arrays.asList(committingSegment1, committingSegment2)));
+ assertEquals(new
HashSet<>(existingRecord.getListField(COMMITTING_SEGMENTS)),
+ new HashSet<>(List.of(committingSegment1, committingSegment2)));
+
+
+ // Test 3: Error handling during ZooKeeper operations
+ when(zkHelixPropertyStore.set(eq(committingSegmentsListPath), any(),
anyInt(), eq(AccessOption.PERSISTENT)))
+ .thenThrow(new RuntimeException("ZooKeeper operation failed"));
+ assertFalse(segmentManager.syncCommittingSegments(realtimeTableName,
newSegments));
+ }
+
+
//////////////////////////////////////////////////////////////////////////////////
// Fake classes
/////////////////////////////////////////////////////////////////////////////////
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]