This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 33bb182c4d9 [controller] Avoid eager IdealState fetch in LLC segment
commit (#17707)
33bb182c4d9 is described below
commit 33bb182c4d9d126b61004250e8201b628cb89bd0
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Feb 19 16:26:20 2026 -0800
[controller] Avoid eager IdealState fetch in LLC segment commit (#17707)
[controller] Add LLC commit-path edge-case coverage
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 155 ++++++++++++++------
.../PinotLLCRealtimeSegmentManagerTest.java | 158 ++++++++++++++++++++-
2 files changed, 267 insertions(+), 46 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 55252182dd6..3e4f2754e5e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -51,6 +51,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
@@ -655,10 +656,6 @@ public class PinotLLCRealtimeSegmentManager {
String committingSegmentName =
committingSegmentDescriptor.getSegmentName();
TableConfig tableConfig = getTableConfig(realtimeTableName);
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
- IdealState idealState = getIdealState(realtimeTableName);
- Preconditions.checkState(
-
idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
- "Failed to find instance in CONSUMING state in IdealState for segment:
%s", committingSegmentName);
/*
* Update zookeeper in 3 steps.
@@ -680,7 +677,7 @@ public class PinotLLCRealtimeSegmentManager {
// Step-2: Create new segment metadata if needed
long startTimeNs2 = System.nanoTime();
String newConsumingSegmentName =
- createNewSegmentMetadata(tableConfig, idealState,
committingSegmentDescriptor, committingSegmentZKMetadata,
+ createNewSegmentMetadata(tableConfig, committingSegmentDescriptor,
committingSegmentZKMetadata,
instancePartitions);
preProcessCommitIdealStateUpdate();
@@ -689,13 +686,34 @@ public class PinotLLCRealtimeSegmentManager {
LOGGER.info("Updating Idealstate for previous: {} and new segment: {}",
committingSegmentName,
newConsumingSegmentName);
long startTimeNs3 = System.nanoTime();
+ Map<String, Map<String, String>> instanceStatesMapAfterStep3 =
Collections.emptyMap();
+ boolean newConsumingSegmentInIdealState = false;
// When multiple segments of the same table complete around the same time
it is possible that
// the idealstate update fails due to contention. We serialize the updates
to the idealstate
// to reduce this contention. We may still contend with RetentionManager,
or other updates
// to idealstate from other controllers, but then we have the retry
mechanism to get around that.
- idealState =
- updateIdealStateForSegments(tableConfig, committingSegmentName,
newConsumingSegmentName, instancePartitions);
+ try {
+ IdealState idealState =
+ updateIdealStateForSegments(tableConfig, committingSegmentName,
newConsumingSegmentName, instancePartitions);
+ instanceStatesMapAfterStep3 = idealState.getRecord().getMapFields();
+ if (newConsumingSegmentName != null) {
+ newConsumingSegmentInIdealState =
instanceStatesMapAfterStep3.containsKey(newConsumingSegmentName);
+ if (!newConsumingSegmentInIdealState) {
+ LOGGER.info(
+ "Cleaning up segment ZK metadata for new consuming segment {} of
table {} because it was not added to "
+ + "IdealState. This can happen when table/topic consumption
is paused.",
+ newConsumingSegmentName, realtimeTableName);
+ removeSegmentZKMetadataBestEffort(realtimeTableName,
newConsumingSegmentName);
+ newConsumingSegmentName = null;
+ }
+ }
+ } catch (RuntimeException e) {
+ if (newConsumingSegmentName != null) {
+ removeSegmentZKMetadataBestEffort(realtimeTableName,
newConsumingSegmentName);
+ }
+ throw e;
+ }
long endTimeNs = System.nanoTime();
LOGGER.info(
@@ -717,8 +735,8 @@ public class PinotLLCRealtimeSegmentManager {
_metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
// Handle segment movement if necessary
- if (newConsumingSegmentName != null) {
- handleSegmentMovement(realtimeTableName,
idealState.getRecord().getMapFields(), committingSegmentName,
+ if (newConsumingSegmentInIdealState) {
+ handleSegmentMovement(realtimeTableName, instanceStatesMapAfterStep3,
committingSegmentName,
newConsumingSegmentName);
}
}
@@ -797,7 +815,7 @@ public class PinotLLCRealtimeSegmentManager {
// Step 2: Create new segment metadata
@Nullable
- private String createNewSegmentMetadata(TableConfig tableConfig, IdealState
idealState,
+ private String createNewSegmentMetadata(TableConfig tableConfig,
CommittingSegmentDescriptor committingSegmentDescriptor,
SegmentZKMetadata committingSegmentZKMetadata,
InstancePartitions instancePartitions) {
String committingSegmentName =
committingSegmentDescriptor.getSegmentName();
@@ -806,40 +824,62 @@ public class PinotLLCRealtimeSegmentManager {
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
String newConsumingSegmentName = null;
- if (!isTablePaused(idealState) && !isTopicPaused(idealState,
committingSegmentName)) {
- LLCSegmentName committingLLCSegment = new
LLCSegmentName(committingSegmentName);
- int committingSegmentPartitionGroupId =
committingLLCSegment.getPartitionGroupId();
-
- List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
- Set<Integer> partitionIds = getPartitionIds(streamConfigs, idealState);
-
- if (partitionIds.contains(committingSegmentPartitionGroupId)) {
- String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
- long newSegmentCreationTimeMs = getCurrentTimeMs();
- LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName,
committingSegmentPartitionGroupId,
- committingLLCSegment.getSequenceNumber() + 1,
newSegmentCreationTimeMs);
-
- StreamConfig streamConfig =
-
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs,
committingSegmentPartitionGroupId);
- createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment,
newSegmentCreationTimeMs,
- committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, partitionIds.size(),
- numReplicas);
- newConsumingSegmentName = newLLCSegment.getSegmentName();
- LOGGER.info("Created new segment metadata for segment: {} with status:
{}.", newConsumingSegmentName,
- Status.IN_PROGRESS);
- } else {
- LOGGER.info(
- "Skipping creation of new segment metadata after segment: {}
during commit. Reason: Partition ID: {} not "
- + "found in upstream metadata.", committingSegmentName,
committingSegmentPartitionGroupId);
+ LLCSegmentName committingLLCSegment = new
LLCSegmentName(committingSegmentName);
+ int committingSegmentPartitionGroupId =
committingLLCSegment.getPartitionGroupId();
+
+ List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
+ PartitionIdsWithIdealState partitionIdsWithIdealState =
getPartitionIdsWithIdealState(streamConfigs,
+ () -> getIdealState(realtimeTableName));
+ Set<Integer> partitionIds = partitionIdsWithIdealState._partitionIds;
+
+ if (partitionIds.contains(committingSegmentPartitionGroupId)) {
+ IdealState idealState = partitionIdsWithIdealState._idealState;
+ if (idealState == null) {
+ idealState = getIdealState(realtimeTableName);
}
+ if (idealState != null
+ && (isTablePaused(idealState) || isTopicPaused(idealState,
committingSegmentName))) {
+ LOGGER.info("Skipping creation of new segment metadata after segment:
{} during commit. Reason: table/topic is "
+ + "paused.", committingSegmentName);
+ return null;
+ }
+ String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
+ long newSegmentCreationTimeMs = getCurrentTimeMs();
+ LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName,
committingSegmentPartitionGroupId,
+ committingLLCSegment.getSequenceNumber() + 1,
newSegmentCreationTimeMs);
+
+ StreamConfig streamConfig =
+
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs,
committingSegmentPartitionGroupId);
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment,
newSegmentCreationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, partitionIds.size(),
+ numReplicas);
+ newConsumingSegmentName = newLLCSegment.getSegmentName();
+ LOGGER.info("Created new segment metadata for segment: {} with status:
{}.", newConsumingSegmentName,
+ Status.IN_PROGRESS);
} else {
LOGGER.info(
- "Skipping creation of new segment metadata after segment: {} during
commit. Reason: table: {} is paused.",
- committingSegmentName, realtimeTableName);
+ "Skipping creation of new segment metadata after segment: {} during
commit. Reason: Partition ID: {} not "
+ + "found in upstream metadata.", committingSegmentName,
committingSegmentPartitionGroupId);
}
return newConsumingSegmentName;
}
+ private void removeSegmentZKMetadataBestEffort(String realtimeTableName,
String segmentName) {
+ String segmentMetadataPath =
+
ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName,
segmentName);
+ try {
+ if (!_propertyStore.remove(segmentMetadataPath,
AccessOption.PERSISTENT)) {
+ LOGGER.warn("Failed to remove segment ZK metadata for segment: {} of
table: {}", segmentName,
+ realtimeTableName);
+ } else {
+ LOGGER.debug("Removed segment ZK metadata for segment: {} of table:
{}", segmentName, realtimeTableName);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while removing segment ZK metadata for
segment: {} of table: {}", segmentName,
+ realtimeTableName, e);
+ }
+ }
+
// Step 3: Update IdealState
private IdealState updateIdealStateForSegments(TableConfig tableConfig,
String committingSegmentName,
String newConsumingSegmentName, InstancePartitions instancePartitions) {
@@ -1137,6 +1177,22 @@ public class PinotLLCRealtimeSegmentManager {
@VisibleForTesting
Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState
idealState) {
+ return getPartitionIdsWithIdealState(streamConfigs, () ->
idealState)._partitionIds;
+ }
+
+ private static class PartitionIdsWithIdealState {
+ private final Set<Integer> _partitionIds;
+ @Nullable
+ private final IdealState _idealState;
+
+ private PartitionIdsWithIdealState(Set<Integer> partitionIds, @Nullable
IdealState idealState) {
+ _partitionIds = partitionIds;
+ _idealState = idealState;
+ }
+ }
+
+ private PartitionIdsWithIdealState
getPartitionIdsWithIdealState(List<StreamConfig> streamConfigs,
+ Supplier<IdealState> idealStateSupplier) {
Set<Integer> partitionIds = new HashSet<>();
boolean allPartitionIdsFetched = true;
int numStreams = streamConfigs.size();
@@ -1176,6 +1232,7 @@ public class PinotLLCRealtimeSegmentManager {
// If it is failing to fetch partition ids from stream (usually transient
due to stream metadata service outage),
// we need to use the existing partition information from ideal state to
keep same ingestion behavior.
if (!allPartitionIdsFetched) {
+ IdealState idealState = idealStateSupplier.get();
LOGGER.info(
"Fetch partition ids from Stream incomplete, merge fetched
partitionIds with partition group metadata "
+ "for: {}", idealState.getId());
@@ -1188,8 +1245,9 @@ public class PinotLLCRealtimeSegmentManager {
partitionIds.addAll(newPartitionGroupMetadataList.stream()
.map(PartitionGroupMetadata::getPartitionGroupId)
.collect(Collectors.toSet()));
+ return new PartitionIdsWithIdealState(partitionIds, idealState);
}
- return partitionIds;
+ return new PartitionIdsWithIdealState(partitionIds, null);
}
/**
@@ -1428,9 +1486,9 @@ public class PinotLLCRealtimeSegmentManager {
throw new HelixHelper.PermanentUpdaterException(
"Exceeded max segment completion time for segment " +
committingSegmentName);
}
-
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
committingSegmentName,
- isTablePaused(idealState) || isTopicPaused(idealState,
committingSegmentName) ? null : newSegmentName,
- segmentAssignment, instancePartitionsMap);
+
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
committingSegmentName,
+ isTablePaused(idealState) || isTopicPaused(idealState,
committingSegmentName), newSegmentName,
+ segmentAssignment, instancePartitionsMap);
return idealState;
};
if (_controllerConf.getSegmentCompletionGroupCommitEnabled()) {
@@ -1486,9 +1544,20 @@ public class PinotLLCRealtimeSegmentManager {
void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String,
String>> instanceStatesMap,
@Nullable String committingSegmentName, @Nullable String newSegmentName,
SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap,
committingSegmentName, false, newSegmentName,
+ segmentAssignment, instancePartitionsMap);
+ }
+
+ private void updateInstanceStatesForNewConsumingSegment(Map<String,
Map<String, String>> instanceStatesMap,
+ @Nullable String committingSegmentName, boolean isTableOrTopicPaused,
@Nullable String newSegmentName,
+ SegmentAssignment segmentAssignment, Map<InstancePartitionsType,
InstancePartitions> instancePartitionsMap) {
if (committingSegmentName != null) {
// Change committing segment state to ONLINE
- Set<String> instances =
instanceStatesMap.get(committingSegmentName).keySet();
+ Map<String, String> committingSegmentInstanceStateMap =
instanceStatesMap.get(committingSegmentName);
+ Preconditions.checkState(committingSegmentInstanceStateMap != null &&
committingSegmentInstanceStateMap
+ .containsValue(SegmentStateModel.CONSUMING),
+ "Failed to find instance in CONSUMING state in IdealState for
segment: %s", committingSegmentName);
+ Set<String> instances = committingSegmentInstanceStateMap.keySet();
instanceStatesMap.put(committingSegmentName,
SegmentAssignmentUtils.getInstanceStateMap(instances,
SegmentStateModel.ONLINE));
LOGGER.info("Updating segment: {} to ONLINE state",
committingSegmentName);
@@ -1501,7 +1570,7 @@ public class PinotLLCRealtimeSegmentManager {
// These conditions can happen again due to manual operations considered
as fixes in Issues #5559 and #5263
// The following check prevents the table from going into such a state
(but does not prevent the root cause
// of attempting such a zk update).
- if (newSegmentName != null) {
+ if (newSegmentName != null && !isTableOrTopicPaused) {
LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName);
int partitionId = newLLCSegmentName.getPartitionGroupId();
int seqNum = newLLCSegmentName.getSequenceNumber();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 94f3abba16e..6f05cd437e4 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1187,6 +1187,155 @@ public class PinotLLCRealtimeSegmentManagerTest {
Assert.assertEquals(segmentZKMetadata.getDownloadUrl(), "");
}
+ @Test
+ public void
testCommitSegmentMetadataSkipsIdealStateFetchWhenPartitionIdsAvailable() {
+ PinotHelixResourceManager mockHelixResourceManager =
mock(PinotHelixResourceManager.class);
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ spy(new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager));
+ setUpNewTable(segmentManager, 2, 5, 4);
+
+ String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(committingSegment,
+ new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(),
0L, "http://control_vip/segments/1");
+ committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+
+ segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
+
+ verify(segmentManager, atLeastOnce()).getIdealState(REALTIME_TABLE_NAME);
+ }
+
+ @Test
+ public void
testCommitSegmentMetadataFetchesIdealStateWhenPartitionIdsFallbackNeeded() {
+ PinotHelixResourceManager mockHelixResourceManager =
mock(PinotHelixResourceManager.class);
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ spy(new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager));
+ setUpNewTable(segmentManager, 2, 5, 4);
+ segmentManager._partitionGroupMetadataList = IntStream.range(0, 4)
+ .mapToObj(partition -> new PartitionGroupMetadata(partition,
PARTITION_OFFSET))
+ .collect(Collectors.toList());
+
+ String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(committingSegment,
+ new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(),
0L, "http://control_vip/segments/1");
+ committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+
+ segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
+
+ verify(segmentManager, atLeastOnce()).getIdealState(REALTIME_TABLE_NAME);
+ }
+
+ @Test
+ public void
testCommitSegmentMetadataSkipsCreatingNewMetadataWhenTopicPausedIfPartitionIdsFallbackNeeded()
{
+ FakePinotLLCRealtimeSegmentManager segmentManager = spy(new
FakePinotLLCRealtimeSegmentManager());
+ setUpNewTable(segmentManager, 2, 5, 4);
+ segmentManager._partitionGroupMetadataList = IntStream.range(0, 4)
+ .mapToObj(partition -> new PartitionGroupMetadata(partition,
PARTITION_OFFSET))
+ .collect(Collectors.toList());
+
+ PauseState pauseState =
+ new PauseState(false, PauseState.ReasonCode.ADMINISTRATIVE,
"pause-topic-for-test",
+ Long.toString(CURRENT_TIME_MS), Collections.singletonList(0));
+
segmentManager._idealState.getRecord().setSimpleField(PinotLLCRealtimeSegmentManager.PAUSE_STATE,
+ pauseState.toJsonString());
+
+ String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(committingSegment,
+ new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(),
0L, "http://control_vip/segments/1");
+ committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+
+ String expectedNewConsumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0,
1, CURRENT_TIME_MS).getSegmentName();
+ segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
+
+
assertFalse(segmentManager._segmentZKMetadataMap.containsKey(expectedNewConsumingSegment));
+
assertFalse(segmentManager._idealState.getRecord().getMapFields().containsKey(expectedNewConsumingSegment));
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
+ (ZkHelixPropertyStore<ZNRecord>)
segmentManager._mockResourceManager.getPropertyStore();
+ verify(propertyStore, never()).remove(anyString(),
eq(AccessOption.PERSISTENT));
+ }
+
+ @Test
+ public void testCommitSegmentMetadataCleansUpMetadataWhenTablePaused() {
+ FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
+ setUpNewTable(segmentManager, 2, 5, 4);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
+ (ZkHelixPropertyStore<ZNRecord>)
segmentManager._mockResourceManager.getPropertyStore();
+ when(propertyStore.remove(anyString(),
eq(AccessOption.PERSISTENT))).thenReturn(true);
+
+ PauseState pauseState = new PauseState(true,
PauseState.ReasonCode.ADMINISTRATIVE, "pause-for-test",
+ Long.toString(CURRENT_TIME_MS), Collections.emptyList());
+
segmentManager._idealState.getRecord().setSimpleField(PinotLLCRealtimeSegmentManager.PAUSE_STATE,
+ pauseState.toJsonString());
+ segmentManager._idealState.getRecord()
+ .setSimpleField(PinotLLCRealtimeSegmentManager.IS_TABLE_PAUSED,
Boolean.TRUE.toString());
+
+ String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(committingSegment,
+ new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(),
0L, "http://control_vip/segments/1");
+ committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+ segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
+
+ String newConsumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1,
CURRENT_TIME_MS).getSegmentName();
+
assertFalse(segmentManager._idealState.getRecord().getMapFields().containsKey(newConsumingSegment));
+ verify(propertyStore, never()).remove(
+
ZKMetadataProvider.constructPropertyStorePathForSegment(REALTIME_TABLE_NAME,
newConsumingSegment),
+ AccessOption.PERSISTENT);
+ }
+
+ @Test
+ public void testCommitSegmentMetadataCleansUpMetadataWhenTopicPaused() {
+ FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
+ setUpNewTable(segmentManager, 2, 5, 4);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
+ (ZkHelixPropertyStore<ZNRecord>)
segmentManager._mockResourceManager.getPropertyStore();
+ when(propertyStore.remove(anyString(),
eq(AccessOption.PERSISTENT))).thenReturn(true);
+
+ PauseState pauseState =
+ new PauseState(false, PauseState.ReasonCode.ADMINISTRATIVE,
"pause-topic-for-test",
+ Long.toString(CURRENT_TIME_MS), Collections.singletonList(0));
+
segmentManager._idealState.getRecord().setSimpleField(PinotLLCRealtimeSegmentManager.PAUSE_STATE,
+ pauseState.toJsonString());
+
+ String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(committingSegment,
+ new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(),
0L, "http://control_vip/segments/1");
+ committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+ segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
+
+ String newConsumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1,
CURRENT_TIME_MS).getSegmentName();
+
assertFalse(segmentManager._idealState.getRecord().getMapFields().containsKey(newConsumingSegment));
+ verify(propertyStore, never()).remove(
+
ZKMetadataProvider.constructPropertyStorePathForSegment(REALTIME_TABLE_NAME,
newConsumingSegment),
+ AccessOption.PERSISTENT);
+ }
+
+ @Test
+ public void
testCommitSegmentMetadataCleansUpMetadataWhenCommittingSegmentNotConsuming() {
+ FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
+ setUpNewTable(segmentManager, 2, 5, 4);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
+ (ZkHelixPropertyStore<ZNRecord>)
segmentManager._mockResourceManager.getPropertyStore();
+ when(propertyStore.remove(anyString(),
eq(AccessOption.PERSISTENT))).thenReturn(true);
+
+ String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+
segmentManager._idealState.getRecord().getMapFields().get(committingSegment)
+ .replaceAll((instance, state) -> SegmentStateModel.ONLINE);
+
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(committingSegment,
+ new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(),
0L, "http://control_vip/segments/1");
+ committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+ try {
+ segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
+ fail("Expected commitSegmentMetadata to fail when committing segment has
no CONSUMING instance");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("CONSUMING"));
+ }
+
+ String newConsumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1,
CURRENT_TIME_MS).getSegmentName();
+ verify(propertyStore).remove(
+
ZKMetadataProvider.constructPropertyStorePathForSegment(REALTIME_TABLE_NAME,
newConsumingSegment),
+ AccessOption.PERSISTENT);
+ }
+
/**
* Test cases for fixing LLC segment by uploading to segment store if missing
*/
@@ -2089,9 +2238,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName,
String committingSegmentName,
String newSegmentName, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap)
{
-
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
committingSegmentName, null,
- segmentAssignment, instancePartitionsMap);
-
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
null, newSegmentName,
+ Map<String, String> committingSegmentInstanceStateMap =
_idealState.getInstanceStateMap(committingSegmentName);
+ Preconditions.checkState(committingSegmentInstanceStateMap != null &&
committingSegmentInstanceStateMap
+ .containsValue(SegmentStateModel.CONSUMING),
+ "Failed to find instance in CONSUMING state in IdealState for
segment: %s", committingSegmentName);
+
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
committingSegmentName,
+ isTablePaused(_idealState) || isTopicPaused(_idealState,
committingSegmentName) ? null : newSegmentName,
segmentAssignment, instancePartitionsMap);
return _idealState;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]