This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cf74f5ad6d Consumes segments in strict order of sequence number
(#15261)
cf74f5ad6d is described below
commit cf74f5ad6dde98157f534e7781cbb177889cddac
Author: NOOB <[email protected]>
AuthorDate: Thu Mar 27 07:12:08 2025 +0530
Consumes segments in strict order of sequence number (#15261)
---
.../apache/pinot/common/metrics/ServerTimer.java | 6 +
.../data/manager/realtime/ConsumerCoordinator.java | 291 +++++++++++++
.../realtime/RealtimeSegmentDataManager.java | 22 +-
.../manager/realtime/RealtimeTableDataManager.java | 61 ++-
.../manager/realtime/ConsumerCoordinatorTest.java | 485 +++++++++++++++++++++
.../realtime/RealtimeSegmentDataManagerTest.java | 19 +-
...FailureInjectingRealtimeSegmentDataManager.java | 19 +-
.../FailureInjectingRealtimeTableDataManager.java | 5 +-
.../table/ingestion/StreamIngestionConfig.java | 27 +-
9 files changed, 891 insertions(+), 44 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index 1618f73047..917a187946 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -63,6 +63,12 @@ public enum ServerTimer implements AbstractMetrics.Timer {
SECONDARY_Q_WAIT_TIME_MS("milliseconds", false,
"Time spent waiting in the secondary queue when BinaryWorkloadScheduler
is used."),
+ PREV_SEGMENT_FETCH_IDEAL_STATE_TIME_MS("milliseconds", false,
+ "Time spent while fetching previous segment from ideal state for any
segment."),
+
+ PREV_SEGMENT_WAIT_TIME_MS("milliseconds", false,
+ "Time spent while waiting on previous segment to be registered."),
+
// Multi-stage
/**
* Time spent building the hash table for the join.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
new file mode 100644
index 0000000000..75ba5f73d4
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerTimer;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The ConsumerCoordinator coordinates the offline->consuming helix
transitions.
+ */
+public class ConsumerCoordinator {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumerCoordinator.class);
+ private static final long WAIT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(3);
+
+ private final Semaphore _semaphore;
+ private final boolean _enforceConsumptionInOrder;
+ private final Condition _condition;
+ private final Lock _lock;
+ private final ServerMetrics _serverMetrics;
+ private final boolean _alwaysRelyOnIdealState;
+ private final RealtimeTableDataManager _realtimeTableDataManager;
+ private final AtomicBoolean _firstTransitionProcessed;
+
+ private volatile int _maxSegmentSeqNumRegistered = -1;
+
+ public ConsumerCoordinator(boolean enforceConsumptionInOrder,
RealtimeTableDataManager realtimeTableDataManager) {
+ _semaphore = new Semaphore(1);
+ _lock = new ReentrantLock();
+ _condition = _lock.newCondition();
+ _enforceConsumptionInOrder = enforceConsumptionInOrder;
+ _realtimeTableDataManager = realtimeTableDataManager;
+ StreamIngestionConfig streamIngestionConfig =
realtimeTableDataManager.getStreamIngestionConfig();
+ if (streamIngestionConfig != null) {
+ // if isUseIdealStateToCalculatePreviousSegment is true, server relies
on ideal state to fetch previous segment
+ // to a segment for all helix transitions.
+ _alwaysRelyOnIdealState =
streamIngestionConfig.isUseIdealStateToCalculatePreviousSegment();
+ } else {
+ _alwaysRelyOnIdealState = false;
+ }
+ _firstTransitionProcessed = new AtomicBoolean(false);
+ _serverMetrics = ServerMetrics.get();
+ }
+
+ public void acquire(LLCSegmentName llcSegmentName)
+ throws InterruptedException {
+ if (_enforceConsumptionInOrder) {
+ long startTimeMs = System.currentTimeMillis();
+ waitForPrevSegment(llcSegmentName);
+
_serverMetrics.addTimedTableValue(_realtimeTableDataManager.getTableName(),
ServerTimer.PREV_SEGMENT_WAIT_TIME_MS,
+ System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
+ }
+
+ long startTimeMs = System.currentTimeMillis();
+ while (!_semaphore.tryAcquire(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
+ LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in:
{}ms. Retrying.", llcSegmentName,
+ System.currentTimeMillis() - startTimeMs);
+ }
+ }
+
+ public void release() {
+ _semaphore.release();
+ }
+
+ @VisibleForTesting
+ Semaphore getSemaphore() {
+ return _semaphore;
+ }
+
+ public void trackSegment(LLCSegmentName llcSegmentName) {
+ _lock.lock();
+ try {
+ if (!_alwaysRelyOnIdealState) {
+ _maxSegmentSeqNumRegistered = Math.max(_maxSegmentSeqNumRegistered,
llcSegmentName.getSequenceNumber());
+ }
+ // notify all helix threads waiting for their offline -> consuming
segment's prev segment to be loaded
+ _condition.signalAll();
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+ private void waitForPrevSegment(LLCSegmentName currSegment)
+ throws InterruptedException {
+
+ if (_alwaysRelyOnIdealState || !_firstTransitionProcessed.get()) {
+ // if _alwaysRelyOnIdealState or no offline -> consuming transition has
been processed, it means rely on
+ // ideal state to fetch previous segment.
+ awaitForPreviousSegmentFromIdealState(currSegment);
+
+ // the first transition will always be prone to error, consider edge
case where segment previous to current
+ // helix transition's segment was deleted and this server came alive
after successful deletion. the prev
+ // segment will not exist, hence first transition is handled using
isFirstTransitionSuccessful.
+ _firstTransitionProcessed.set(true);
+ return;
+ }
+
+ // rely on _maxSegmentSeqNumRegistered watermark for previous segment.
+ if (awaitForPreviousSegmentSequenceNumber(currSegment, WAIT_INTERVAL_MS)) {
+ return;
+ }
+
+ // tried using prevSegSeqNumber watermark, but could not acquire the
previous segment.
+ // fallback to acquire prev segment from ideal state.
+ awaitForPreviousSegmentFromIdealState(currSegment);
+ }
+
+ private void awaitForPreviousSegmentFromIdealState(LLCSegmentName
currSegment)
+ throws InterruptedException {
+ String previousSegment = getPreviousSegmentFromIdealState(currSegment);
+ if (previousSegment == null) {
+ // previous segment can only be null if either all the previous segments
are deleted or this is the starting
+ // sequence segment of the partition Group.
+ return;
+ }
+
+ SegmentDataManager segmentDataManager =
_realtimeTableDataManager.acquireSegment(previousSegment);
+ try {
+ long startTimeMs = System.currentTimeMillis();
+ _lock.lock();
+ try {
+ while (segmentDataManager == null) {
+ // if segmentDataManager == null, it means segment is not loaded in
the server.
+ // wait until it's loaded.
+ if (!_condition.await(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
+ LOGGER.warn("Semaphore access denied to segment: {}. Waited on
previous segment: {} for: {}ms.",
+ currSegment.getSegmentName(), previousSegment,
System.currentTimeMillis() - startTimeMs);
+ // waited until timeout, fetch previous segment again from ideal
state as previous segment might be
+ // changed in ideal state.
+ previousSegment = getPreviousSegmentFromIdealState(currSegment);
+ if (previousSegment == null) {
+ return;
+ }
+ }
+ segmentDataManager =
_realtimeTableDataManager.acquireSegment(previousSegment);
+ }
+ } finally {
+ _lock.unlock();
+ }
+ } finally {
+ if (segmentDataManager != null) {
+ _realtimeTableDataManager.releaseSegment(segmentDataManager);
+ }
+ }
+ }
+
+ /***
+ * @param currSegment is the segment of current helix transition.
+ * @param timeoutMs is max time to wait in millis
+ * @return true if previous Segment was registered to the server, else false.
+ * @throws InterruptedException
+ */
+ @VisibleForTesting
+ boolean awaitForPreviousSegmentSequenceNumber(LLCSegmentName currSegment,
long timeoutMs)
+ throws InterruptedException {
+ long startTimeMs = System.currentTimeMillis();
+ int prevSeqNum = currSegment.getSequenceNumber() - 1;
+ _lock.lock();
+ try {
+ while (_maxSegmentSeqNumRegistered < prevSeqNum) {
+ // it means the previous segment is not loaded in the server. Wait
until it's loaded.
+ if (!_condition.await(timeoutMs, TimeUnit.MILLISECONDS)) {
+ LOGGER.warn(
+ "Semaphore access denied to segment: {}. Waited on previous
segment with sequence number: {} for: {}ms.",
+ currSegment.getSegmentName(), prevSeqNum,
System.currentTimeMillis() - startTimeMs);
+ // waited until the timeout. Rely on ideal state now.
+ return _maxSegmentSeqNumRegistered >= prevSeqNum;
+ }
+ }
+ return true;
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ @Nullable
+ String getPreviousSegmentFromIdealState(LLCSegmentName currSegment) {
+ long startTimeMs = System.currentTimeMillis();
+ // if seq num of current segment is 102, maxSequenceNumBelowCurrentSegment
must be highest seq num of any segment
+ // created before current segment
+ int maxSequenceNumBelowCurrentSegment = -1;
+ String previousSegment = null;
+ int currPartitionGroupId = currSegment.getPartitionGroupId();
+ int currSequenceNum = currSegment.getSequenceNumber();
+ Map<String, Map<String, String>> segmentAssignment =
getSegmentAssignment();
+ String currentServerInstanceId =
_realtimeTableDataManager.getServerInstance();
+
+ for (Map.Entry<String, Map<String, String>> entry :
segmentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> instanceStateMap = entry.getValue();
+ String state = instanceStateMap.get(currentServerInstanceId);
+
+ if
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
+ // if server is looking for previous segment to current transition's
segment, it means the previous segment
+ // has to be online in the instance. If all previous segments are not
online, we just allow the current helix
+ // transition to go ahead.
+ continue;
+ }
+
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+ if (llcSegmentName == null) {
+ // ignore uploaded segments
+ continue;
+ }
+
+ if (llcSegmentName.getPartitionGroupId() != currPartitionGroupId) {
+ // ignore segments of different partitions.
+ continue;
+ }
+
+ if (llcSegmentName.getSequenceNumber() >= currSequenceNum) {
+ // ignore segments with higher sequence number than existing helix
transition segment.
+ continue;
+ }
+
+ if (llcSegmentName.getSequenceNumber() >
maxSequenceNumBelowCurrentSegment) {
+ maxSequenceNumBelowCurrentSegment = llcSegmentName.getSequenceNumber();
+ // also track the name of segment
+ previousSegment = segmentName;
+ }
+ }
+
+ long timeSpentMs = System.currentTimeMillis() - startTimeMs;
+ LOGGER.info("Fetched previous segment: {} to current segment: {} in:
{}ms.", previousSegment, currSegment,
+ timeSpentMs);
+ _serverMetrics.addTimedTableValue(_realtimeTableDataManager.getTableName(),
+ ServerTimer.PREV_SEGMENT_FETCH_IDEAL_STATE_TIME_MS, timeSpentMs,
TimeUnit.MILLISECONDS);
+
+ return previousSegment;
+ }
+
+ @VisibleForTesting
+ Map<String, Map<String, String>> getSegmentAssignment() {
+ IdealState idealState =
HelixHelper.getTableIdealState(_realtimeTableDataManager.getHelixManager(),
+ _realtimeTableDataManager.getTableName());
+ Preconditions.checkState(idealState != null, "Failed to find ideal state
for table: %s",
+ _realtimeTableDataManager.getTableName());
+ return idealState.getRecord().getMapFields();
+ }
+
+ @VisibleForTesting
+ Lock getLock() {
+ return _lock;
+ }
+
+ @VisibleForTesting
+ AtomicBoolean getFirstTransitionProcessed() {
+ return _firstTransitionProcessed;
+ }
+
+ // this should not be used outside of tests.
+ @VisibleForTesting
+ int getMaxSegmentSeqNumLoaded() {
+ return _maxSegmentSeqNumRegistered;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 229799e8d2..9011318891 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -247,7 +247,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// Semaphore for each partitionGroupId only, which is to prevent two
different stream consumers
// from consuming with the same partitionGroupId in parallel in the same
host.
// See the comments in {@link RealtimeTableDataManager}.
- private final Semaphore _partitionGroupConsumerSemaphore;
+ private final ConsumerCoordinator _consumerCoordinator;
// A boolean flag to check whether the current thread has acquired the
semaphore.
// This boolean is needed because the semaphore is shared by threads; every
thread holding this semaphore can
// modify the permit. This boolean make sure the semaphore gets released
only once when the partition group stops
@@ -1066,7 +1066,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
@VisibleForTesting
Semaphore getPartitionGroupConsumerSemaphore() {
- return _partitionGroupConsumerSemaphore;
+ return _consumerCoordinator.getSemaphore();
}
@VisibleForTesting
@@ -1280,8 +1280,8 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
closePartitionGroupConsumer();
closePartitionMetadataProvider();
if (_acquiredConsumerSemaphore.compareAndSet(true, false)) {
- _segmentLogger.info("Releasing the _partitionGroupConsumerSemaphore");
- _partitionGroupConsumerSemaphore.release();
+ _segmentLogger.info("Releasing the consumer semaphore");
+ _consumerCoordinator.release();
}
}
@@ -1540,7 +1540,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// If the transition is OFFLINE to ONLINE, the caller should have downloaded
the segment and we don't reach here.
public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata,
TableConfig tableConfig,
RealtimeTableDataManager realtimeTableDataManager, String
resourceDataDir, IndexLoadingConfig indexLoadingConfig,
- Schema schema, LLCSegmentName llcSegmentName, Semaphore
partitionGroupConsumerSemaphore,
+ Schema schema, LLCSegmentName llcSegmentName, ConsumerCoordinator
consumerCoordinator,
ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager
partitionUpsertMetadataManager,
@Nullable PartitionDedupMetadataManager partitionDedupMetadataManager,
BooleanSupplier isReadyToConsumeData)
throws AttemptsExceededException, RetriableOperationException {
@@ -1585,7 +1585,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_segmentZKMetadata.getEndOffset() == null ? null
:
_streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getEndOffset()),
_segmentZKMetadata.getStatus().toString());
- _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
+ _consumerCoordinator = consumerCoordinator;
_acquiredConsumerSemaphore = new AtomicBoolean(false);
InstanceDataManagerConfig instanceDataManagerConfig =
indexLoadingConfig.getInstanceDataManagerConfig();
String clientIdSuffix =
@@ -1679,11 +1679,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// Acquire semaphore to create stream consumers
try {
- long startTimeMs = System.currentTimeMillis();
- while (!_partitionGroupConsumerSemaphore.tryAcquire(5,
TimeUnit.MINUTES)) {
- _segmentLogger.warn("Failed to acquire partitionGroupConsumerSemaphore
in: {} ms. Retrying.",
- System.currentTimeMillis() - startTimeMs);
- }
+ _consumerCoordinator.acquire(llcSegmentName);
_acquiredConsumerSemaphore.set(true);
} catch (InterruptedException e) {
String errorMsg = "InterruptedException when acquiring the
partitionConsumerSemaphore";
@@ -1714,8 +1710,8 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// In case of exception thrown here, segment goes to ERROR state. Then
any attempt to reset the segment from
// ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the
semaphore is acquired, but not released.
// Hence releasing the semaphore here to unblock reset operation via
Helix Admin.
- _segmentLogger.info("Releasing the _partitionGroupConsumerSemaphore");
- _partitionGroupConsumerSemaphore.release();
+ _segmentLogger.info("Releasing the consumer semaphore");
+ _consumerCoordinator.release();
_acquiredConsumerSemaphore.set(false);
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(),
"Failed to initialize segment data manager", t));
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index b070b743e0..7bffd4fdd8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -75,6 +75,8 @@ import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
@@ -106,8 +108,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// consuming from the same stream partition can lead to bugs.
// The semaphores will stay in the hash map even if the consuming partitions
move to a different host.
// We expect that there will be a small number of semaphores, but that may
be ok.
- private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new
ConcurrentHashMap<>();
-
+ private final Map<Integer, ConsumerCoordinator>
_partitionGroupIdToConsumerCoordinatorMap =
+ new ConcurrentHashMap<>();
// The old name of the stats file used to be stats.ser which we changed when
we moved all packages
// from com.linkedin to org.apache because of not being able to deserialize
the old files using the newer classes
private static final String STATS_FILE_NAME = "segment-stats.ser";
@@ -142,6 +144,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
private TableDedupMetadataManager _tableDedupMetadataManager;
private TableUpsertMetadataManager _tableUpsertMetadataManager;
private BooleanSupplier _isTableReadyToConsumeData;
+ private boolean _enforceConsumptionInOrder = false;
public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
this(segmentBuildSemaphore, () -> true);
@@ -223,6 +226,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
_tableUpsertMetadataManager.init(_tableConfig, schema, this);
}
+ _enforceConsumptionInOrder = isEnforceConsumptionInOrder();
+
// For dedup and partial-upsert, need to wait for all segments loaded
before starting consuming data
if (isDedupEnabled() || isPartialUpsertEnabled()) {
_isTableReadyToConsumeData = new BooleanSupplier() {
@@ -511,7 +516,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
private void doAddConsumingSegment(String segmentName)
throws AttemptsExceededException, RetriableOperationException {
SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
- if (zkMetadata.getStatus() != Status.IN_PROGRESS) {
+ if ((zkMetadata.getStatus() != Status.IN_PROGRESS) &&
(!_enforceConsumptionInOrder)) {
// NOTE: We do not throw exception here because the segment might have
just been committed before the state
// transition is processed. We can skip adding this segment, and
the segment will enter CONSUMING state in
// Helix, then we can rely on the following CONSUMING -> ONLINE
state transition to add it.
@@ -543,7 +548,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// Generates only one semaphore for every partition
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
int partitionGroupId = llcSegmentName.getPartitionGroupId();
- Semaphore semaphore =
_partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId, k -> new
Semaphore(1));
+ ConsumerCoordinator consumerCoordinator =
getConsumerCoordinator(partitionGroupId);
// Create the segment data manager and register it
PartitionUpsertMetadataManager partitionUpsertMetadataManager =
@@ -553,8 +558,9 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
_tableDedupMetadataManager != null ?
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
: null;
RealtimeSegmentDataManager realtimeSegmentDataManager =
- createRealtimeSegmentDataManager(zkMetadata, tableConfig,
indexLoadingConfig, schema, llcSegmentName, semaphore,
- partitionUpsertMetadataManager, partitionDedupMetadataManager,
_isTableReadyToConsumeData);
+ createRealtimeSegmentDataManager(zkMetadata, tableConfig,
indexLoadingConfig, schema, llcSegmentName,
+ consumerCoordinator, partitionUpsertMetadataManager,
partitionDedupMetadataManager,
+ _isTableReadyToConsumeData);
registerSegment(segmentName, realtimeSegmentDataManager,
partitionUpsertMetadataManager);
if (partitionUpsertMetadataManager != null) {
partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName);
@@ -641,12 +647,12 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
@VisibleForTesting
protected RealtimeSegmentDataManager
createRealtimeSegmentDataManager(SegmentZKMetadata zkMetadata,
TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, Schema
schema, LLCSegmentName llcSegmentName,
- Semaphore semaphore, PartitionUpsertMetadataManager
partitionUpsertMetadataManager,
+ ConsumerCoordinator consumerCoordinator, PartitionUpsertMetadataManager
partitionUpsertMetadataManager,
PartitionDedupMetadataManager partitionDedupMetadataManager,
BooleanSupplier isTableReadyToConsumeData)
throws AttemptsExceededException, RetriableOperationException {
return new RealtimeSegmentDataManager(zkMetadata, tableConfig, this,
_indexDir.getAbsolutePath(),
- indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics,
partitionUpsertMetadataManager,
- partitionDedupMetadataManager, isTableReadyToConsumeData);
+ indexLoadingConfig, schema, llcSegmentName, consumerCoordinator,
_serverMetrics,
+ partitionUpsertMetadataManager, partitionDedupMetadataManager,
isTableReadyToConsumeData);
}
/**
@@ -803,6 +809,21 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
registerSegment(segmentName, segmentDataManager);
}
+ @Override
+ protected SegmentDataManager registerSegment(String segmentName,
SegmentDataManager segmentDataManager) {
+ SegmentDataManager oldSegmentDataManager =
super.registerSegment(segmentName, segmentDataManager);
+ if (_enforceConsumptionInOrder) {
+ // helix threads might be waiting for their respective previous segments
to be loaded.
+ // they need to be notified here.
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+ if (llcSegmentName != null) {
+ ConsumerCoordinator consumerCoordinator =
getConsumerCoordinator(llcSegmentName.getPartitionGroupId());
+ consumerCoordinator.trackSegment(llcSegmentName);
+ }
+ }
+ return oldSegmentDataManager;
+ }
+
/**
* Replaces the CONSUMING segment with a downloaded committed one.
*/
@@ -852,6 +873,23 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
return Collections.emptyMap();
}
+ @Nullable
+ public StreamIngestionConfig getStreamIngestionConfig() {
+ IngestionConfig ingestionConfig = _tableConfig.getIngestionConfig();
+ return ingestionConfig != null ?
ingestionConfig.getStreamIngestionConfig() : null;
+ }
+
+ @VisibleForTesting
+ ConsumerCoordinator getConsumerCoordinator(int partitionGroupId) {
+ return
_partitionGroupIdToConsumerCoordinatorMap.computeIfAbsent(partitionGroupId,
+ k -> new ConsumerCoordinator(_enforceConsumptionInOrder, this));
+ }
+
+ @VisibleForTesting
+ void setEnforceConsumptionInOrder(boolean enforceConsumptionInOrder) {
+ _enforceConsumptionInOrder = enforceConsumptionInOrder;
+ }
+
/**
* Validate a schema against the table config for real-time record
consumption.
* Ideally, we should validate these things when schema is added or table is
created, but either of these
@@ -886,4 +924,9 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// 2. Validate the schema itself
SchemaUtils.validate(schema);
}
+
+ private boolean isEnforceConsumptionInOrder() {
+ StreamIngestionConfig streamIngestionConfig = getStreamIngestionConfig();
+ return streamIngestionConfig != null &&
streamIngestionConfig.isEnforceConsumptionInOrder();
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
new file mode 100644
index 0000000000..a2b01a44ea
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.cache.CacheBuilder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.util.TestUtils;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class ConsumerCoordinatorTest {
+
+ private static class FakeRealtimeTableDataManager extends
RealtimeTableDataManager {
+ private final StreamIngestionConfig _streamIngestionConfig;
+ private ConsumerCoordinator _consumerCoordinator;
+
+ public FakeRealtimeTableDataManager(Semaphore segmentBuildSemaphore,
+ boolean useIdealStateToCalculatePreviousSegment) {
+ super(segmentBuildSemaphore);
+ super._recentlyDeletedSegments = CacheBuilder.newBuilder().build();
+ StreamIngestionConfig streamIngestionConfig = new
StreamIngestionConfig(List.of(new HashMap<>()));
+ streamIngestionConfig.setEnforceConsumptionInOrder(true);
+ if (useIdealStateToCalculatePreviousSegment) {
+ streamIngestionConfig.setUseIdealStateToCalculatePreviousSegment(true);
+ }
+ _streamIngestionConfig = streamIngestionConfig;
+ }
+
+ @Override
+ ConsumerCoordinator getConsumerCoordinator(int partitionId) {
+ return _consumerCoordinator;
+ }
+
+ public void setConsumerCoordinator(ConsumerCoordinator
consumerCoordinator) {
+ _consumerCoordinator = consumerCoordinator;
+ }
+
+ @Override
+ public StreamIngestionConfig getStreamIngestionConfig() {
+ return _streamIngestionConfig;
+ }
+
+ @Override
+ public String getServerInstance() {
+ return "server_1";
+ }
+ }
+
+ private static class FakeConsumerCoordinator extends ConsumerCoordinator {
+ private final Map<String, Map<String, String>> _segmentAssignmentMap;
+
+ public FakeConsumerCoordinator(boolean enforceConsumptionInOrder,
+ RealtimeTableDataManager realtimeTableDataManager) {
+ super(enforceConsumptionInOrder, realtimeTableDataManager);
+ Map<String, String> serverSegmentStatusMap = new HashMap<>() {{
+ put("server_1", "ONLINE");
+ put("server_3", "ONLINE");
+ }};
+ _segmentAssignmentMap = new HashMap<>() {{
+ put("tableTest_REALTIME__1__101__20250304T0035Z",
serverSegmentStatusMap);
+ put("tableTest_REALTIME__2__101__20250304T0035Z",
serverSegmentStatusMap);
+ put("tableTest_REALTIME__2__100__20250304T0035Z",
serverSegmentStatusMap);
+ put("tableTest_REALTIME__1__1__20250304T0035Z",
serverSegmentStatusMap);
+ put("tableTest_REALTIME__1__14__20250304T0035Z",
serverSegmentStatusMap);
+ put("tableTest_REALTIME__1__91__20250304T0035Z",
serverSegmentStatusMap);
+ put("tableTest_REALTIME__1__90__20250304T0035Z",
serverSegmentStatusMap);
+ }};
+ }
+
+ @Override
+ public Map<String, Map<String, String>> getSegmentAssignment() {
+ return _segmentAssignmentMap;
+ }
+ }
+
+ @Test
+ public void testAwaitForPreviousSegmentSequenceNumber()
+ throws InterruptedException {
+ // 1. enable tracking segment seq num.
+ FakeRealtimeTableDataManager realtimeTableDataManager = new
FakeRealtimeTableDataManager(null, false);
+ realtimeTableDataManager.setEnforceConsumptionInOrder(true);
+ FakeConsumerCoordinator consumerCoordinator = new
FakeConsumerCoordinator(true, realtimeTableDataManager);
+ realtimeTableDataManager.setConsumerCoordinator(consumerCoordinator);
+
+ // 2. check if thread waits on prev segment seq
+ AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+ Thread thread1 = new Thread(() -> {
+ LLCSegmentName llcSegmentName = getLLCSegment(101);
+ try {
+ boolean b =
consumerCoordinator.awaitForPreviousSegmentSequenceNumber(llcSegmentName, 5000);
+ atomicBoolean.set(b);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ thread1.start();
+
+ // 3. add prev segment and check if thread is unblocked.
+ consumerCoordinator.trackSegment(getLLCSegment(100));
+
+ TestUtils.waitForCondition(aVoid -> atomicBoolean.get(), 4000,
+ "Thread waiting on previous segment should have been unblocked.");
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 100);
+
+ // 4. check if second thread waits on prev segment seq until timeout and
returns false
+ AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
+ Thread thread2 = new Thread(() -> {
+ LLCSegmentName llcSegmentName = getLLCSegment(102);
+ try {
+ boolean b =
consumerCoordinator.awaitForPreviousSegmentSequenceNumber(llcSegmentName, 500);
+ atomicBoolean2.set(b);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ thread2.start();
+
+ Thread.sleep(1500);
+
+ Assert.assertFalse(atomicBoolean2.get());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 100);
+ }
+
+ @Test
+ public void testFirstConsumer()
+ throws InterruptedException {
+ // 1. Enable tracking segment seq num.
+ FakeRealtimeTableDataManager realtimeTableDataManager = new
FakeRealtimeTableDataManager(null, false);
+ realtimeTableDataManager.setEnforceConsumptionInOrder(true);
+ FakeConsumerCoordinator consumerCoordinator = new
FakeConsumerCoordinator(true, realtimeTableDataManager);
+ realtimeTableDataManager.setConsumerCoordinator(consumerCoordinator);
+ ReentrantLock lock = (ReentrantLock) consumerCoordinator.getLock();
+ RealtimeSegmentDataManager mockedRealtimeSegmentDataManager =
getMockedRealtimeSegmentDataManager();
+ Map<String, String> serverSegmentStatusMap = new HashMap<>() {{
+ put("server_1", "ONLINE");
+ put("server_3", "ONLINE");
+ }};
+ consumerCoordinator.getSegmentAssignment().put(getSegmentName(100),
serverSegmentStatusMap);
+ consumerCoordinator.getSegmentAssignment().put(getSegmentName(102),
serverSegmentStatusMap);
+ consumerCoordinator.getSegmentAssignment().put(getSegmentName(104),
serverSegmentStatusMap);
+ consumerCoordinator.getSegmentAssignment().put(getSegmentName(106),
serverSegmentStatusMap);
+ consumerCoordinator.getSegmentAssignment().put(getSegmentName(107),
serverSegmentStatusMap);
+ consumerCoordinator.getSegmentAssignment().put(getSegmentName(109),
serverSegmentStatusMap);
+
+ // 2. create multiple helix transitions in this order: 106, 109, 104, 107
+ Thread thread1 = getNewThread(consumerCoordinator, getLLCSegment(106));
+ Thread thread2 = getNewThread(consumerCoordinator, getLLCSegment(109));
+ Thread thread3 = getNewThread(consumerCoordinator, getLLCSegment(104));
+ Thread thread4 = getNewThread(consumerCoordinator, getLLCSegment(107));
+
+ thread1.start();
+
+ Thread.sleep(1000);
+
+ // 3. load segment 100, 101, 102
+ realtimeTableDataManager.registerSegment(getSegmentName(100),
mockedRealtimeSegmentDataManager);
+ realtimeTableDataManager.registerSegment(getSegmentName(101),
mockedRealtimeSegmentDataManager);
+ realtimeTableDataManager.registerSegment(getSegmentName(102),
mockedRealtimeSegmentDataManager);
+ Thread.sleep(1000);
+
+ // 4. check all of the above threads wait
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
1);
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+
Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get());
+
+ thread2.start();
+ thread3.start();
+ thread4.start();
+
+ Thread.sleep(1000);
+
+ // 5. check that first thread acquiring semaphore is of segment 104
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+ Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+
+ realtimeTableDataManager.registerSegment(getSegmentName(104),
mockedRealtimeSegmentDataManager);
+ Thread.sleep(1000);
+
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 104);
+ Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+ Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(),
1);
+
+ // 6. check the next threads acquiring semaphore is 106
+ consumerCoordinator.getSemaphore().release();
+ realtimeTableDataManager.registerSegment(getSegmentName(106),
mockedRealtimeSegmentDataManager);
+
+ Thread.sleep(1000);
+
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 106);
+ Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+ Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(),
1);
+ }
+
+ @Test
+ public void testSequentialOrderNotRelyingOnIdealState()
+ throws InterruptedException {
+ // 1. Enable tracking segment seq num.
+ FakeRealtimeTableDataManager realtimeTableDataManager = new
FakeRealtimeTableDataManager(null, false);
+ realtimeTableDataManager.setEnforceConsumptionInOrder(true);
+
+ FakeConsumerCoordinator consumerCoordinator = new
FakeConsumerCoordinator(true, realtimeTableDataManager);
+ realtimeTableDataManager.setConsumerCoordinator(consumerCoordinator);
+ ReentrantLock lock = (ReentrantLock) consumerCoordinator.getLock();
+
+ // 2. check first transition blocked on ideal state
+ Thread thread1 = getNewThread(consumerCoordinator, getLLCSegment(101));
+ thread1.start();
+
+ Thread.sleep(2000);
+
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
1);
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1);
+
Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get());
+
+ RealtimeSegmentDataManager mockedRealtimeSegmentDataManager =
getMockedRealtimeSegmentDataManager();
+
+ // 3. register older segment and check seq num watermark and semaphore.
+ realtimeTableDataManager.registerSegment(getSegmentName(90),
mockedRealtimeSegmentDataManager);
+ Thread.sleep(1000);
+
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
1);
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 90);
+
Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get());
+
+ // 4. register prev segment and check watermark and if thread was unblocked
+ realtimeTableDataManager.registerSegment(getSegmentName(91),
mockedRealtimeSegmentDataManager);
+ Thread.sleep(1000);
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91);
+ Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+
+ // 5. check that all the following transitions rely on seq num watermark
and gets blocked.
+ Thread thread2 = getNewThread(consumerCoordinator, getLLCSegment(102));
+ Thread thread3 = getNewThread(consumerCoordinator, getLLCSegment(103));
+ Thread thread4 = getNewThread(consumerCoordinator, getLLCSegment(104));
+ thread3.start();
+ thread2.start();
+ thread4.start();
+
+ Thread.sleep(2000);
+
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91);
+ Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+
+ // 6. check that all above threads are still blocked even if semaphore is
released.
+ consumerCoordinator.getSemaphore().release();
+
+ Thread.sleep(1000);
+
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
1);
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91);
+ Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+
+ // 6. mark 101 seg as complete. Check 102 acquired the semaphore.
+ realtimeTableDataManager.registerSegment(getSegmentName(101),
mockedRealtimeSegmentDataManager);
+
+ Thread.sleep(1000);
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 101);
+ Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+ Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(),
0);
+
+ // 7. register 102 seg, check if seg 103 is waiting on semaphore.
+ realtimeTableDataManager.registerSegment(getSegmentName(102),
mockedRealtimeSegmentDataManager);
+
+ Thread.sleep(1000);
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
+ Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+ Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(),
1);
+
+ // 8. release the semaphore and check if semaphore is acquired by seg 103.
+ consumerCoordinator.getSemaphore().release();
+ Thread.sleep(1000);
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
+ Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+ Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(),
0);
+
+ // 8. register 103 seg and check if seg 104 is now queued on semaphore
+ realtimeTableDataManager.registerSegment(getSegmentName(103),
mockedRealtimeSegmentDataManager);
+ Thread.sleep(1000);
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 103);
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
+ Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(),
1);
+ }
+
+ @Test
+ public void testSequentialOrderRelyingOnIdealState()
+ throws InterruptedException {
+ FakeRealtimeTableDataManager realtimeTableDataManager = new
FakeRealtimeTableDataManager(null, true);
+ realtimeTableDataManager.setEnforceConsumptionInOrder(true);
+
+ FakeConsumerCoordinator consumerCoordinator = new
FakeConsumerCoordinator(true, realtimeTableDataManager);
+ realtimeTableDataManager.setConsumerCoordinator(consumerCoordinator);
+
+ // 1. test that acquire blocks when prev segment is not loaded.
+ Thread thread = getNewThread(consumerCoordinator, getLLCSegment(101));
+ thread.start();
+
+ ReentrantLock lock = (ReentrantLock) consumerCoordinator.getLock();
+
+ RealtimeSegmentDataManager mockedRealtimeSegmentDataManager =
Mockito.mock(RealtimeSegmentDataManager.class);
+
Mockito.when(mockedRealtimeSegmentDataManager.increaseReferenceCount()).thenReturn(true);
+ Assert.assertNotNull(realtimeTableDataManager);
+
+ // prev segment has seq 91, so registering seq 90 won't do anything.
+ realtimeTableDataManager.registerSegment(getSegmentName(90),
mockedRealtimeSegmentDataManager);
+
+ Thread.sleep(2000);
+
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
1);
+
+ // 2. test that registering prev segment will unblock thread.
+ realtimeTableDataManager.registerSegment(getSegmentName(91),
mockedRealtimeSegmentDataManager);
+
+ TestUtils.waitForCondition(aVoid ->
(consumerCoordinator.getSemaphore().availablePermits() == 0), 5000,
+ "Semaphore must be acquired after registering previous segment");
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+
+ consumerCoordinator.release();
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
1);
+ Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads());
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1);
+ realtimeTableDataManager.registerSegment(getSegmentName(101),
mockedRealtimeSegmentDataManager);
+
+ // 3. test that segment 103 will be blocked.
+ Map<String, String> serverSegmentStatusMap = new HashMap<>() {{
+ put("server_1", "ONLINE");
+ put("server_3", "ONLINE");
+ }};
+ consumerCoordinator.getSegmentAssignment().put(getSegmentName(102),
serverSegmentStatusMap);
+ consumerCoordinator.getSegmentAssignment().put(getSegmentName(103),
serverSegmentStatusMap);
+ consumerCoordinator.getSegmentAssignment().put(getSegmentName(104),
serverSegmentStatusMap);
+
+ Thread thread1 = getNewThread(consumerCoordinator, getLLCSegment(103));
+ thread1.start();
+
+ Thread.sleep(1000);
+
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
1);
+
+ // 3. test that segment 102 will acquire semaphore.
+ Thread thread2 = getNewThread(consumerCoordinator, getLLCSegment(102));
+ thread2.start();
+
+ Thread.sleep(1000);
+
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
+ Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads());
+
+ // 4. registering seg 102 should unblock seg 103
+ realtimeTableDataManager.registerSegment(getSegmentName(102),
mockedRealtimeSegmentDataManager);
+
+ Thread.sleep(1000);
+
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
+ Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(),
1);
+
+ // 5. releasing semaphore should let seg 103 acquire it
+ consumerCoordinator.getSemaphore().release();
+
+ Thread.sleep(1000);
+
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
+ Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(),
0);
+ Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+ Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1);
+ }
+
+ @Test
+ public void testRandomOrder()
+ throws InterruptedException {
+ RealtimeTableDataManager realtimeTableDataManager =
Mockito.mock(RealtimeTableDataManager.class);
+
Mockito.when(realtimeTableDataManager.getTableName()).thenReturn("tableTest_REALTIME");
+
+ FakeConsumerCoordinator consumerCoordinator = new
FakeConsumerCoordinator(false, realtimeTableDataManager);
+
+ String segmentName = "tableTest_REALTIME__1__101__20250304T0035Z";
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+ Assert.assertNotNull(llcSegmentName);
+ consumerCoordinator.acquire(llcSegmentName);
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
+ Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads());
+
+ consumerCoordinator.release();
+ Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
1);
+ Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads());
+ }
+
+ @Test
+ public void testPreviousSegment() {
+ RealtimeTableDataManager realtimeTableDataManager =
Mockito.mock(RealtimeTableDataManager.class);
+
Mockito.when(realtimeTableDataManager.getTableName()).thenReturn("tableTest_REALTIME");
+
Mockito.when(realtimeTableDataManager.getServerInstance()).thenReturn("server_1");
+
+ FakeConsumerCoordinator consumerCoordinator = new
FakeConsumerCoordinator(true, realtimeTableDataManager);
+
+ String segmentName = "tableTest_REALTIME__1__101__20250304T0035Z";
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+ Assert.assertNotNull(llcSegmentName);
+ String previousSegment =
consumerCoordinator.getPreviousSegmentFromIdealState(llcSegmentName);
+ Assert.assertEquals(previousSegment,
"tableTest_REALTIME__1__91__20250304T0035Z");
+
+ consumerCoordinator.getSegmentAssignment().clear();
+ Map<String, String> serverSegmentStatusMap = new HashMap<>() {{
+ put("server_3", "ONLINE");
+ }};
+ consumerCoordinator.getSegmentAssignment().put(getSegmentName(100),
serverSegmentStatusMap);
+ previousSegment =
consumerCoordinator.getPreviousSegmentFromIdealState(llcSegmentName);
+ Assert.assertNull(previousSegment);
+ }
+
+ private Thread getNewThread(FakeConsumerCoordinator consumerCoordinator,
LLCSegmentName llcSegmentName) {
+ return new Thread(() -> {
+ try {
+ consumerCoordinator.acquire(llcSegmentName);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }, String.valueOf(llcSegmentName.getSequenceNumber()));
+ }
+
+ private RealtimeSegmentDataManager getMockedRealtimeSegmentDataManager() {
+ RealtimeSegmentDataManager mockedRealtimeSegmentDataManager =
Mockito.mock(RealtimeSegmentDataManager.class);
+
Mockito.when(mockedRealtimeSegmentDataManager.increaseReferenceCount()).thenReturn(true);
+ Assert.assertNotNull(mockedRealtimeSegmentDataManager);
+ return mockedRealtimeSegmentDataManager;
+ }
+
+ private LLCSegmentName getLLCSegment(int seqNum) {
+ String segmentName = getSegmentName(seqNum);
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+ Assert.assertNotNull(llcSegmentName);
+ return llcSegmentName;
+ }
+
+ private String getSegmentName(int seqNum) {
+ return "tableTest_REALTIME__1__" + seqNum + "__20250304T0035Z";
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 623d0de37d..72dbf26acb 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -97,7 +97,8 @@ public class RealtimeSegmentDataManagerTest {
private static final long START_OFFSET_VALUE = 198L;
private static final LongMsgOffset START_OFFSET = new
LongMsgOffset(START_OFFSET_VALUE);
- private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new
ConcurrentHashMap<>();
+ private final Map<Integer, ConsumerCoordinator>
_partitionGroupIdToConsumerCoordinatorMap =
+ new ConcurrentHashMap<>();
private static TableConfig createTableConfig()
throws Exception {
@@ -166,12 +167,13 @@ public class RealtimeSegmentDataManagerTest {
tableConfig.getIngestionConfig().setRetryOnSegmentBuildPrecheckFailure(true);
RealtimeTableDataManager tableDataManager =
createTableDataManager(tableConfig);
LLCSegmentName llcSegmentName = new LLCSegmentName(SEGMENT_NAME_STR);
- _partitionGroupIdToSemaphoreMap.putIfAbsent(PARTITION_GROUP_ID, new
Semaphore(1));
+ _partitionGroupIdToConsumerCoordinatorMap.putIfAbsent(PARTITION_GROUP_ID,
+ new ConsumerCoordinator(false, tableDataManager));
Schema schema = Fixtures.createSchema();
ServerMetrics serverMetrics = new
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
return new FakeRealtimeSegmentDataManager(segmentZKMetadata, tableConfig,
tableDataManager,
new File(TEMP_DIR, REALTIME_TABLE_NAME).getAbsolutePath(), schema,
llcSegmentName,
- _partitionGroupIdToSemaphoreMap, serverMetrics, timeSupplier);
+ _partitionGroupIdToConsumerCoordinatorMap, serverMetrics,
timeSupplier);
}
@BeforeClass
@@ -992,7 +994,7 @@ public class RealtimeSegmentDataManagerTest {
private boolean _notifySegmentBuildFailedWithDeterministicErrorCalled =
false;
public boolean _throwExceptionFromConsume = false;
public boolean _postConsumeStoppedCalled = false;
- public Map<Integer, Semaphore> _semaphoreMap;
+ public Map<Integer, ConsumerCoordinator> _consumerCoordinatorMap;
public boolean _stubConsumeLoop = true;
private TimeSupplier _timeSupplier;
private boolean _indexCapacityThresholdBreached;
@@ -1009,12 +1011,13 @@ public class RealtimeSegmentDataManagerTest {
public FakeRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata,
TableConfig tableConfig,
RealtimeTableDataManager realtimeTableDataManager, String
resourceDataDir, Schema schema,
- LLCSegmentName llcSegmentName, Map<Integer, Semaphore> semaphoreMap,
ServerMetrics serverMetrics,
- TimeSupplier timeSupplier)
+ LLCSegmentName llcSegmentName, Map<Integer, ConsumerCoordinator>
consumerCoordinatorMap,
+ ServerMetrics serverMetrics, TimeSupplier timeSupplier)
throws Exception {
super(segmentZKMetadata, tableConfig, realtimeTableDataManager,
resourceDataDir,
new IndexLoadingConfig(makeInstanceDataManagerConfig(),
tableConfig), schema, llcSegmentName,
- semaphoreMap.get(llcSegmentName.getPartitionGroupId()),
serverMetrics, null, null, () -> true);
+ consumerCoordinatorMap.get(llcSegmentName.getPartitionGroupId()),
serverMetrics, null, null,
+ () -> true);
_state = RealtimeSegmentDataManager.class.getDeclaredField("_state");
_state.setAccessible(true);
_shouldStop =
RealtimeSegmentDataManager.class.getDeclaredField("_shouldStop");
@@ -1024,7 +1027,7 @@ public class RealtimeSegmentDataManagerTest {
_segmentBuildFailedWithDeterministicError =
RealtimeSegmentDataManager.class.getDeclaredField("_segmentBuildFailedWithDeterministicError");
_segmentBuildFailedWithDeterministicError.setAccessible(true);
- _semaphoreMap = semaphoreMap;
+ _consumerCoordinatorMap = consumerCoordinatorMap;
_streamMsgOffsetFactory =
RealtimeSegmentDataManager.class.getDeclaredField("_streamPartitionMsgOffsetFactory");
_streamMsgOffsetFactory.setAccessible(true);
_streamMsgOffsetFactory.set(this, new LongMsgOffsetFactory());
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
index 960138e8a0..8899fbd723 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
@@ -18,10 +18,10 @@
*/
package org.apache.pinot.integration.tests.realtime.utils;
-import java.util.concurrent.Semaphore;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.core.data.manager.realtime.ConsumerCoordinator;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -43,16 +43,15 @@ public class FailureInjectingRealtimeSegmentDataManager
extends RealtimeSegmentD
/**
* Creates a manager that will forcibly fail the commit segment step.
*/
- public FailureInjectingRealtimeSegmentDataManager(SegmentZKMetadata
segmentZKMetadata,
- TableConfig tableConfig, RealtimeTableDataManager
realtimeTableDataManager, String resourceDataDir,
- IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName
llcSegmentName,
- Semaphore partitionGroupConsumerSemaphore, ServerMetrics serverMetrics,
- boolean failCommit) throws AttemptsExceededException,
RetriableOperationException {
+ public FailureInjectingRealtimeSegmentDataManager(SegmentZKMetadata
segmentZKMetadata, TableConfig tableConfig,
+ RealtimeTableDataManager realtimeTableDataManager, String
resourceDataDir, IndexLoadingConfig indexLoadingConfig,
+ Schema schema, LLCSegmentName llcSegmentName, ConsumerCoordinator
consumerCoordinator,
+ ServerMetrics serverMetrics, boolean failCommit)
+ throws AttemptsExceededException, RetriableOperationException {
// Pass through to the real parent constructor
- super(segmentZKMetadata, tableConfig, realtimeTableDataManager,
resourceDataDir,
- indexLoadingConfig, schema, llcSegmentName,
partitionGroupConsumerSemaphore, serverMetrics,
- null /* no PartitionUpsertMetadataManager */, null /* no
PartitionDedupMetadataManager */,
- () -> true /* isReadyToConsumeData always true for tests */);
+ super(segmentZKMetadata, tableConfig, realtimeTableDataManager,
resourceDataDir, indexLoadingConfig, schema,
+ llcSegmentName, consumerCoordinator, serverMetrics, null /* no
PartitionUpsertMetadataManager */,
+ null /* no PartitionDedupMetadataManager */, () -> true /*
isReadyToConsumeData always true for tests */);
_failCommit = failCommit;
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
index 278e2bcb58..af65cd1a79 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
@@ -25,6 +25,7 @@ import java.util.function.Supplier;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.PauselessConsumptionUtils;
+import org.apache.pinot.core.data.manager.realtime.ConsumerCoordinator;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
@@ -52,7 +53,7 @@ public class FailureInjectingRealtimeTableDataManager extends
RealtimeTableDataM
@Override
protected RealtimeSegmentDataManager
createRealtimeSegmentDataManager(SegmentZKMetadata zkMetadata,
TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, Schema
schema, LLCSegmentName llcSegmentName,
- Semaphore semaphore, PartitionUpsertMetadataManager
partitionUpsertMetadataManager,
+ ConsumerCoordinator consumerCoordinator, PartitionUpsertMetadataManager
partitionUpsertMetadataManager,
PartitionDedupMetadataManager partitionDedupMetadataManager,
BooleanSupplier isTableReadyToConsumeData)
throws AttemptsExceededException, RetriableOperationException {
@@ -61,6 +62,6 @@ public class FailureInjectingRealtimeTableDataManager extends
RealtimeTableDataM
addFailureToCommits = false;
}
return new FailureInjectingRealtimeSegmentDataManager(zkMetadata,
tableConfig, this, _indexDir.getAbsolutePath(),
- indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics,
addFailureToCommits);
+ indexLoadingConfig, schema, llcSegmentName, consumerCoordinator,
_serverMetrics, addFailureToCommits);
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
index e1400620fb..da17a2c020 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
@@ -39,10 +39,17 @@ public class StreamIngestionConfig extends BaseJsonConfig {
private boolean _columnMajorSegmentBuilderEnabled = true;
@JsonPropertyDescription("Whether to track offsets of the filtered stream
messages during consumption.")
- private boolean _trackFilteredMessageOffsets = false;
+ private boolean _trackFilteredMessageOffsets;
@JsonPropertyDescription("Whether pauseless consumption is enabled for the
table")
- private boolean _pauselessConsumptionEnabled = false;
+ private boolean _pauselessConsumptionEnabled;
+
+ @JsonPropertyDescription("Enforce consumption of segments in order of
segment creation by the controller")
+ private boolean _enforceConsumptionInOrder;
+
+ @JsonPropertyDescription("If enabled, Server always relies on ideal state to
get previous segment. If disabled, "
+ + "server uses sequence id - 1 for previous segment")
+ private boolean _useIdealStateToCalculatePreviousSegment;
@JsonPropertyDescription("Policy to determine the behaviour of parallel
consumption.")
private ParallelSegmentConsumptionPolicy _parallelSegmentConsumptionPolicy;
@@ -80,6 +87,22 @@ public class StreamIngestionConfig extends BaseJsonConfig {
_pauselessConsumptionEnabled = pauselessConsumptionEnabled;
}
+ public boolean isEnforceConsumptionInOrder() {
+ return _enforceConsumptionInOrder;
+ }
+
+ public void setEnforceConsumptionInOrder(boolean enforceConsumptionInOrder) {
+ _enforceConsumptionInOrder = enforceConsumptionInOrder;
+ }
+
+ public boolean isUseIdealStateToCalculatePreviousSegment() {
+ return _useIdealStateToCalculatePreviousSegment;
+ }
+
+ public void setUseIdealStateToCalculatePreviousSegment(boolean
useIdealStateToCalculatePreviousSegment) {
+ _useIdealStateToCalculatePreviousSegment =
useIdealStateToCalculatePreviousSegment;
+ }
+
@Nullable
public ParallelSegmentConsumptionPolicy
getParallelSegmentConsumptionPolicy() {
return _parallelSegmentConsumptionPolicy;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]