This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 20ec20da88 Fix starvation in consumer lock (#15404)
20ec20da88 is described below
commit 20ec20da88cf822705043f38213e435626a33de6
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Mar 28 22:25:40 2025 -0600
Fix starvation in consumer lock (#15404)
---
.../core/data/manager/BaseTableDataManager.java | 5 +
.../data/manager/realtime/ConsumerCoordinator.java | 290 +++++++++------------
.../realtime/RealtimeSegmentDataManager.java | 34 +--
.../manager/realtime/RealtimeTableDataManager.java | 97 +++----
.../realtime/SegmentAlreadyConsumedException.java | 26 --
.../manager/realtime/ConsumerCoordinatorTest.java | 148 ++++-------
.../realtime/RealtimeSegmentDataManagerTest.java | 55 ----
7 files changed, 229 insertions(+), 426 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 13edaf56cf..0070c8562f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -344,6 +344,11 @@ public abstract class BaseTableDataManager implements
TableDataManager {
protected abstract void doAddOnlineSegment(String segmentName)
throws Exception;
+ @Nullable
+ public SegmentZKMetadata fetchZKMetadataNullable(String segmentName) {
+ return ZKMetadataProvider.getSegmentZKMetadata(_propertyStore,
_tableNameWithType, segmentName);
+ }
+
@Override
public SegmentZKMetadata fetchZKMetadata(String segmentName) {
SegmentZKMetadata zkMetadata =
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
index 8d82a17e9a..6510418598 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
@@ -34,9 +34,8 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
-import org.apache.pinot.spi.utils.CommonConstants;
+import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,61 +47,63 @@ public class ConsumerCoordinator {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumerCoordinator.class);
private static final long WAIT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(3);
- private final Semaphore _semaphore;
private final boolean _enforceConsumptionInOrder;
- private final Condition _condition;
- private final Lock _lock;
- private final ServerMetrics _serverMetrics;
- private final boolean _alwaysRelyOnIdealState;
private final RealtimeTableDataManager _realtimeTableDataManager;
- private final AtomicBoolean _firstTransitionProcessed;
+ private final boolean _useIdealStateToCalculatePreviousSegment;
+ private final ServerMetrics _serverMetrics;
+
+ // We use semaphore of 1 permit instead of lock because the semaphore is
shared across multiple threads, and it can be
+ // released by a different thread than the one that acquired it. There is no
out-of-box Lock implementation that
+ // allows releasing the lock from a different thread.
+ private final Semaphore _semaphore = new Semaphore(1);
+ private final Lock _lock = new ReentrantLock();
+ private final Condition _condition = _lock.newCondition();
+ private final AtomicBoolean _firstTransitionProcessed = new
AtomicBoolean(false);
- private volatile int _maxSegmentSeqNumRegistered = -1;
+ private volatile int _maxSequenceNumberRegistered = -1;
public ConsumerCoordinator(boolean enforceConsumptionInOrder,
RealtimeTableDataManager realtimeTableDataManager) {
- _semaphore = new Semaphore(1);
- _lock = new ReentrantLock();
- _condition = _lock.newCondition();
_enforceConsumptionInOrder = enforceConsumptionInOrder;
_realtimeTableDataManager = realtimeTableDataManager;
StreamIngestionConfig streamIngestionConfig =
realtimeTableDataManager.getStreamIngestionConfig();
- if (streamIngestionConfig != null) {
- // if isUseIdealStateToCalculatePreviousSegment is true, server relies
on ideal state to fetch previous segment
- // to a segment for all helix transitions.
- _alwaysRelyOnIdealState =
streamIngestionConfig.isUseIdealStateToCalculatePreviousSegment();
- } else {
- _alwaysRelyOnIdealState = false;
- }
- _firstTransitionProcessed = new AtomicBoolean(false);
+ _useIdealStateToCalculatePreviousSegment =
+ streamIngestionConfig != null &&
streamIngestionConfig.isUseIdealStateToCalculatePreviousSegment();
_serverMetrics = ServerMetrics.get();
}
public void acquire(LLCSegmentName llcSegmentName)
- throws InterruptedException {
+ throws InterruptedException, ShouldNotConsumeException {
+ String segmentName = llcSegmentName.getSegmentName();
if (_enforceConsumptionInOrder) {
long startTimeMs = System.currentTimeMillis();
- waitForPrevSegment(llcSegmentName);
+ SegmentZKMetadata segmentZKMetadata =
waitForPreviousSegment(llcSegmentName);
_serverMetrics.addTimedTableValue(_realtimeTableDataManager.getTableName(),
ServerTimer.PREV_SEGMENT_WAIT_TIME_MS,
System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
- if (isSegmentAlreadyConsumed(llcSegmentName.getSegmentName())) {
- // if segment is already consumed, just return from here.
- // NOTE: if segment is deleted, this segment will never be registered
and helix thread waiting on
- // watermark for prev segment won't be notified. All such helix
threads will fallback to rely on ideal
- // state for previous segment.
- throw new
SegmentAlreadyConsumedException(llcSegmentName.getSegmentName());
+ // When consumption order is enforced, unless the segment is deleted, we
wait until the previous segment is
+ // registered regardless of whether ZK metadata status has changed to
guarantee the consumption ordering.
+ //
+ // Prevent the following scenario:
+ // - Seg 100 (OFFLINE -> CONSUMING pending)
+ //
+ // - Seg 101 (OFFLINE -> CONSUMING returned because of status change)
+ // - Seg 101 (CONSUMING -> ONLINE processed)
+ //
+ // - Seg 102 (OFFLINE -> CONSUMING started consuming while 100 is not
registered)
+ if (segmentZKMetadata != null) {
+ checkSegmentStatus(segmentZKMetadata);
}
}
long startTimeMs = System.currentTimeMillis();
while (!_semaphore.tryAcquire(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
- String currSegmentName = llcSegmentName.getSegmentName();
- LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in:
{}ms. Retrying.", currSegmentName,
+ LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in:
{}ms. Retrying.", segmentName,
System.currentTimeMillis() - startTimeMs);
-
- if (isSegmentAlreadyConsumed(currSegmentName)) {
- throw new SegmentAlreadyConsumedException(currSegmentName);
+ SegmentZKMetadata segmentZKMetadata =
_realtimeTableDataManager.fetchZKMetadataNullable(segmentName);
+ if (segmentZKMetadata == null) {
+ throw new ShouldNotConsumeException("Segment: " + segmentName + " is
deleted");
}
+ checkSegmentStatus(segmentZKMetadata);
}
}
@@ -115,177 +116,126 @@ public class ConsumerCoordinator {
return _semaphore;
}
- public void trackSegment(LLCSegmentName llcSegmentName) {
+ public void register(LLCSegmentName llcSegmentName) {
_lock.lock();
try {
- if (!_alwaysRelyOnIdealState) {
- _maxSegmentSeqNumRegistered = Math.max(_maxSegmentSeqNumRegistered,
llcSegmentName.getSequenceNumber());
+ int sequenceNumber = llcSegmentName.getSequenceNumber();
+ if (sequenceNumber > _maxSequenceNumberRegistered) {
+ _maxSequenceNumberRegistered = sequenceNumber;
+ // notify all helix threads waiting for their offline -> consuming
segment's prev segment to be loaded
+ _condition.signalAll();
}
- // notify all helix threads waiting for their offline -> consuming
segment's prev segment to be loaded
- _condition.signalAll();
} finally {
_lock.unlock();
}
}
- private void waitForPrevSegment(LLCSegmentName currSegment)
- throws InterruptedException {
-
- if (_alwaysRelyOnIdealState || !_firstTransitionProcessed.get()) {
- // if _alwaysRelyOnIdealState or no offline -> consuming transition has
been processed, it means rely on
- // ideal state to fetch previous segment.
- awaitForPreviousSegmentFromIdealState(currSegment);
-
- // the first transition will always be prone to error, consider edge
case where segment previous to current
- // helix transition's segment was deleted and this server came alive
after successful deletion. the prev
- // segment will not exist, hence first transition is handled using
isFirstTransitionSuccessful.
- _firstTransitionProcessed.set(true);
- return;
- }
-
- // rely on _maxSegmentSeqNumRegistered watermark for previous segment.
- if (awaitForPreviousSegmentSequenceNumber(currSegment, WAIT_INTERVAL_MS)) {
- return;
- }
-
- // tried using prevSegSeqNumber watermark, but could not acquire the
previous segment.
- // fallback to acquire prev segment from ideal state.
- awaitForPreviousSegmentFromIdealState(currSegment);
- }
-
- private void awaitForPreviousSegmentFromIdealState(LLCSegmentName
currSegment)
- throws InterruptedException {
- String previousSegment = getPreviousSegmentFromIdealState(currSegment);
- if (previousSegment == null) {
- // previous segment can only be null if either all the previous segments
are deleted or this is the starting
- // sequence segment of the partition Group.
- return;
- }
-
- SegmentDataManager segmentDataManager =
_realtimeTableDataManager.acquireSegment(previousSegment);
- try {
- long startTimeMs = System.currentTimeMillis();
- _lock.lock();
- try {
- while (segmentDataManager == null) {
- // if segmentDataManager == null, it means segment is not loaded in
the server.
- // wait until it's loaded.
- if (!_condition.await(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
- LOGGER.warn("Semaphore access denied to segment: {}. Waited on
previous segment: {} for: {}ms.",
- currSegment.getSegmentName(), previousSegment,
System.currentTimeMillis() - startTimeMs);
-
- // waited until timeout, fetch previous segment again from ideal
state as previous segment might be
- // changed in ideal state.
- previousSegment = getPreviousSegmentFromIdealState(currSegment);
- if (previousSegment == null) {
- return;
- }
- }
- segmentDataManager =
_realtimeTableDataManager.acquireSegment(previousSegment);
- }
- } finally {
- _lock.unlock();
- }
- } finally {
- if (segmentDataManager != null) {
- _realtimeTableDataManager.releaseSegment(segmentDataManager);
+ /**
+ * Waits for the previous segment to be registered to the server. Returns
the segment ZK metadata fetched during the
+ * wait to reduce unnecessary ZK read.
+ */
+ @Nullable
+ private SegmentZKMetadata waitForPreviousSegment(LLCSegmentName
currentSegment)
+ throws InterruptedException, ShouldNotConsumeException {
+ if (!_firstTransitionProcessed.get() ||
_useIdealStateToCalculatePreviousSegment) {
+ SegmentZKMetadata segmentZKMetadata = null;
+ if (_maxSequenceNumberRegistered < currentSegment.getSequenceNumber() -
1) {
+ int previousSegmentSequenceNumber =
getPreviousSegmentSequenceNumberFromIdealState(currentSegment);
+ segmentZKMetadata = waitForPreviousSegment(currentSegment,
previousSegmentSequenceNumber);
}
+ _firstTransitionProcessed.set(true);
+ return segmentZKMetadata;
+ } else {
+ return waitForPreviousSegment(currentSegment,
currentSegment.getSequenceNumber() - 1);
}
}
- /***
- * @param currSegment is the segment of current helix transition.
- * @param timeoutMs is max time to wait in millis
- * @return true if previous Segment was registered to the server, else false.
- * @throws InterruptedException
+ /**
+ * Waits for the previous segment with the sequence number to be registered
to the server. Returns the segment ZK
+ * metadata fetched during the wait to reduce unnecessary ZK read..
*/
+ @Nullable
@VisibleForTesting
- boolean awaitForPreviousSegmentSequenceNumber(LLCSegmentName currSegment,
long timeoutMs)
- throws InterruptedException {
+ SegmentZKMetadata waitForPreviousSegment(LLCSegmentName currentSegment, int
previousSegmentSequenceNumber)
+ throws InterruptedException, ShouldNotConsumeException {
+ if (previousSegmentSequenceNumber <= _maxSequenceNumberRegistered) {
+ return null;
+ }
+ SegmentZKMetadata segmentZKMetadata = null;
long startTimeMs = System.currentTimeMillis();
- int prevSeqNum = currSegment.getSequenceNumber() - 1;
_lock.lock();
try {
- while (_maxSegmentSeqNumRegistered < prevSeqNum) {
+ while (previousSegmentSequenceNumber > _maxSequenceNumberRegistered) {
// it means the previous segment is not loaded in the server. Wait
until it's loaded.
- if (!_condition.await(timeoutMs, TimeUnit.MILLISECONDS)) {
- LOGGER.warn(
- "Semaphore access denied to segment: {}. Waited on previous
segment with sequence number: {} for: {}ms.",
- currSegment.getSegmentName(), prevSeqNum,
System.currentTimeMillis() - startTimeMs);
-
- // waited until the timeout. Rely on ideal state now.
- return _maxSegmentSeqNumRegistered >= prevSeqNum;
+ if (!_condition.await(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
+ String segmentName = currentSegment.getSegmentName();
+ LOGGER.warn("Waited on previous segment with sequence number: {}
for: {}ms. "
+ + "Refreshing the previous segment sequence number for
current segment: {}",
+ previousSegmentSequenceNumber, System.currentTimeMillis() -
startTimeMs, segmentName);
+ segmentZKMetadata =
_realtimeTableDataManager.fetchZKMetadataNullable(segmentName);
+ if (segmentZKMetadata == null) {
+ throw new ShouldNotConsumeException("Segment: " + segmentName + "
is deleted");
+ }
+ previousSegmentSequenceNumber =
getPreviousSegmentSequenceNumberFromIdealState(currentSegment);
}
}
- return true;
+ return segmentZKMetadata;
} finally {
_lock.unlock();
}
}
@VisibleForTesting
- @Nullable
- String getPreviousSegmentFromIdealState(LLCSegmentName currSegment) {
+ int getPreviousSegmentSequenceNumberFromIdealState(LLCSegmentName
currentSegment) {
long startTimeMs = System.currentTimeMillis();
- // if seq num of current segment is 102, maxSequenceNumBelowCurrentSegment
must be highest seq num of any segment
- // created before current segment
- int maxSequenceNumBelowCurrentSegment = -1;
- String previousSegment = null;
- int currPartitionGroupId = currSegment.getPartitionGroupId();
- int currSequenceNum = currSegment.getSequenceNumber();
- Map<String, Map<String, String>> segmentAssignment =
getSegmentAssignment();
- String currentServerInstanceId =
_realtimeTableDataManager.getServerInstance();
-
- for (Map.Entry<String, Map<String, String>> entry :
segmentAssignment.entrySet()) {
- String segmentName = entry.getKey();
- Map<String, String> instanceStateMap = entry.getValue();
- String state = instanceStateMap.get(currentServerInstanceId);
-
- if
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
+ // Track the highest sequence number of any segment created before the
current segment. If there is none, return -1
+ // so that it can always pass the check.
+ int maxSequenceNumberBelowCurrentSegment = -1;
+ String instanceId = _realtimeTableDataManager.getServerInstance();
+ int partitionId = currentSegment.getPartitionGroupId();
+ int currentSequenceNumber = currentSegment.getSequenceNumber();
+
+ for (Map.Entry<String, Map<String, String>> entry :
getSegmentAssignment().entrySet()) {
+ String state = entry.getValue().get(instanceId);
+ if (!SegmentStateModel.ONLINE.equals(state)) {
// if server is looking for previous segment to current transition's
segment, it means the previous segment
// has to be online in the instance. If all previous segments are not
online, we just allow the current helix
// transition to go ahead.
continue;
}
- LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(entry.getKey());
if (llcSegmentName == null) {
// ignore uploaded segments
continue;
}
- if (llcSegmentName.getPartitionGroupId() != currPartitionGroupId) {
+ if (llcSegmentName.getPartitionGroupId() != partitionId) {
// ignore segments of different partitions.
continue;
}
- if (llcSegmentName.getSequenceNumber() >= currSequenceNum) {
- // ignore segments with higher sequence number than existing helix
transition segment.
- continue;
- }
-
- if (llcSegmentName.getSequenceNumber() >
maxSequenceNumBelowCurrentSegment) {
- maxSequenceNumBelowCurrentSegment = llcSegmentName.getSequenceNumber();
- // also track the name of segment
- previousSegment = segmentName;
+ int sequenceNumber = llcSegmentName.getSequenceNumber();
+ if (sequenceNumber > maxSequenceNumberBelowCurrentSegment &&
sequenceNumber < currentSequenceNumber) {
+ maxSequenceNumberBelowCurrentSegment = sequenceNumber;
}
}
long timeSpentMs = System.currentTimeMillis() - startTimeMs;
- LOGGER.info("Fetched previous segment: {} to current segment: {} in:
{}ms.", previousSegment,
- currSegment.getSegmentName(), timeSpentMs);
+ LOGGER.info("Fetched previous segment sequence number: {} to current
segment: {} in: {}ms.",
+ maxSequenceNumberBelowCurrentSegment, currentSegment.getSegmentName(),
timeSpentMs);
_serverMetrics.addTimedTableValue(_realtimeTableDataManager.getTableName(),
ServerTimer.PREV_SEGMENT_FETCH_IDEAL_STATE_TIME_MS, timeSpentMs,
TimeUnit.MILLISECONDS);
- return previousSegment;
+ return maxSequenceNumberBelowCurrentSegment;
}
@VisibleForTesting
Map<String, Map<String, String>> getSegmentAssignment() {
- IdealState idealState =
HelixHelper.getTableIdealState(_realtimeTableDataManager.getHelixManager(),
- _realtimeTableDataManager.getTableName());
- Preconditions.checkState(idealState != null, "Failed to find ideal state
for table: %s",
- _realtimeTableDataManager.getTableName());
+ String realtimeTableName = _realtimeTableDataManager.getTableName();
+ IdealState idealState =
+
HelixHelper.getTableIdealState(_realtimeTableDataManager.getHelixManager(),
realtimeTableName);
+ Preconditions.checkState(idealState != null, "Failed to find ideal state
for table: %s", realtimeTableName);
return idealState.getRecord().getMapFields();
}
@@ -299,26 +249,32 @@ public class ConsumerCoordinator {
return _firstTransitionProcessed;
}
- // this should not be used outside of tests.
@VisibleForTesting
- int getMaxSegmentSeqNumLoaded() {
- return _maxSegmentSeqNumRegistered;
+ int getMaxSequenceNumberRegistered() {
+ return _maxSequenceNumberRegistered;
}
- @VisibleForTesting
- boolean isSegmentAlreadyConsumed(String currSegmentName) {
- SegmentZKMetadata segmentZKMetadata =
_realtimeTableDataManager.fetchZKMetadata(currSegmentName);
- if (segmentZKMetadata == null) {
- // segment is deleted. no need to consume.
- LOGGER.warn("Skipping consumption for segment: {} because ZK metadata
does not exists.", currSegmentName);
- return true;
- }
+ private static void checkSegmentStatus(SegmentZKMetadata segmentZKMetadata)
+ throws ShouldNotConsumeException {
if (segmentZKMetadata.getStatus().isCompleted()) {
- // if segment is done or uploaded, no need to consume.
- LOGGER.warn("Skipping consumption for segment: {} because ZK status is
already marked as completed.",
- currSegmentName);
- return true;
+ throw new ShouldNotConsumeException(
+ "Segment: " + segmentZKMetadata.getSegmentName() + " is already
completed with status: "
+ + segmentZKMetadata.getStatus());
+ }
+ }
+
+ /**
+ * This exception is thrown when attempting to acquire the consumer
semaphore for a segment that should not be
+ * consumed anymore:
+ * - Segment is in completed status (DONE/UPLOADED)
+ * - Segment is deleted
+ *
+ * We allow consumption when segment is COMMITTING (for pauseless
consumption) because there is no guarantee that the
+ * segment will be committed soon. This way the slow server can still catch
up.
+ */
+ public static class ShouldNotConsumeException extends Exception {
+ public ShouldNotConsumeException(String message) {
+ super(message);
}
- return false;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index a5ac3c2e95..0979782428 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -244,6 +244,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private final int _segmentMaxRowCount;
private final String _resourceDataDir;
private final Schema _schema;
+ private final LLCSegmentName _llcSegmentName;
private final AtomicBoolean _streamConsumerClosed = new AtomicBoolean(false);
// Semaphore for each partitionGroupId only, which is to prevent two
different stream consumers
// from consuming with the same partitionGroupId in parallel in the same
host.
@@ -734,6 +735,16 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
} while (!_shouldStop && !_isReadyToConsumeData.getAsBoolean());
}
+ // Acquire semaphore before consuming data
+ try {
+ _consumerCoordinator.acquire(_llcSegmentName);
+ } catch (ConsumerCoordinator.ShouldNotConsumeException e) {
+ _segmentLogger.info("Skipping consumption because: {}",
e.getMessage());
+ return;
+ }
+ _consumerSemaphoreAcquired.set(true);
+ _consumerCoordinator.register(_llcSegmentName);
+
// TODO:
// When reaching here, the current consuming segment has already
acquired the consumer semaphore, but there is
// no guarantee that the previous consuming segment is already
persisted (replaced with immutable segment). It
@@ -1068,16 +1079,6 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
return _segmentBuildDescriptor;
}
- @VisibleForTesting
- Semaphore getPartitionGroupConsumerSemaphore() {
- return _consumerCoordinator.getSemaphore();
- }
-
- @VisibleForTesting
- AtomicBoolean getConsumerSemaphoreAcquired() {
- return _consumerSemaphoreAcquired;
- }
-
@VisibleForTesting
protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
if (_parallelSegmentConsumptionPolicy.isAllowedDuringBuild()) {
@@ -1569,6 +1570,8 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_realtimeTableDataManager = realtimeTableDataManager;
_resourceDataDir = resourceDataDir;
_schema = schema;
+ _llcSegmentName = llcSegmentName;
+ _consumerCoordinator = consumerCoordinator;
_serverMetrics = serverMetrics;
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
_partitionDedupMetadataManager = partitionDedupMetadataManager;
@@ -1603,7 +1606,6 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_segmentZKMetadata.getEndOffset() == null ? null
:
_streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getEndOffset()),
_segmentZKMetadata.getStatus().toString());
- _consumerCoordinator = consumerCoordinator;
InstanceDataManagerConfig instanceDataManagerConfig =
indexLoadingConfig.getInstanceDataManagerConfig();
String clientIdSuffix =
instanceDataManagerConfig != null ?
instanceDataManagerConfig.getConsumerClientIdSuffix() : null;
@@ -1694,16 +1696,6 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
throw e;
}
- // Acquire semaphore to create stream consumers
- try {
- _consumerCoordinator.acquire(llcSegmentName);
- _consumerSemaphoreAcquired.set(true);
- } catch (InterruptedException e) {
- String errorMsg = "InterruptedException when acquiring the
partitionConsumerSemaphore";
- _segmentLogger.error(errorMsg);
- throw new RuntimeException(errorMsg + " for segment: " +
_segmentNameStr);
- }
-
try {
_startOffset = _partitionGroupConsumptionStatus.getStartOffset();
_currentOffset = _streamPartitionMsgOffsetFactory.create(_startOffset);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index fdc74f3918..53ced97032 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -99,17 +99,16 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
private RealtimeSegmentStatsHistory _statsHistory;
private final Semaphore _segmentBuildSemaphore;
- // Maintains a map of partition id to semaphore.
+ // Maintains a map from partition id to consumer coordinator. The consumer
coordinator uses a semaphore to ensure that
+ // exactly one PartitionConsumer instance consumes from any stream partition.
+ // In some streams, it's possible that having multiple consumers (with the
same consumer name on the same host)
+ // consuming from the same stream partition can lead to bugs.
// We use semaphore of 1 permit instead of lock because the semaphore is
shared across multiple threads, and it can be
// released by a different thread than the one that acquired it. There is no
out-of-box Lock implementation that
// allows releasing the lock from a different thread.
- // The semaphore ensures that exactly one PartitionConsumer instance
consumes from any stream partition.
- // In some streams, it's possible that having multiple consumers (with the
same consumer name on the same host)
- // consuming from the same stream partition can lead to bugs.
- // The semaphores will stay in the hash map even if the consuming partitions
move to a different host.
- // We expect that there will be a small number of semaphores, but that may
be ok.
- private final Map<Integer, ConsumerCoordinator>
_partitionGroupIdToConsumerCoordinatorMap =
- new ConcurrentHashMap<>();
+ // The consumer coordinators will stay in the map even if the consuming
partitions moved to a different server. We
+ // expect a small number of consumer coordinators, so it should be fine to
not remove them.
+ private final Map<Integer, ConsumerCoordinator>
_partitionIdToConsumerCoordinatorMap = new ConcurrentHashMap<>();
// The old name of the stats file used to be stats.ser which we changed when
we moved all packages
// from com.linkedin to org.apache because of not being able to deserialize
the old files using the newer classes
private static final String STATS_FILE_NAME = "segment-stats.ser";
@@ -474,23 +473,26 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
SegmentDataManager segmentDataManager =
_segmentDataManagerMap.get(segmentName);
if (segmentDataManager == null) {
addNewOnlineSegment(zkMetadata, indexLoadingConfig);
- return;
- }
- if (segmentDataManager instanceof RealtimeSegmentDataManager) {
+ } else if (segmentDataManager instanceof RealtimeSegmentDataManager) {
_logger.info("Changing segment: {} from CONSUMING to ONLINE",
segmentName);
((RealtimeSegmentDataManager)
segmentDataManager).goOnlineFromConsuming(zkMetadata);
onConsumingToOnline(segmentName);
- return;
- }
- // For pauseless ingestion, the segment is marked ONLINE before it's built
and before the COMMIT_END_METADATA
- // call completes.
- // The server should replace the segment only after the CRC is set by
COMMIT_END_METADATA and the segment is
- // marked DONE.
- // This ensures the segment's download URL is available before discarding
the locally built copy, preventing
- // data loss if COMMIT_END_METADATA fails.
- if (zkMetadata.getStatus() == Status.DONE) {
+ } else if (zkMetadata.getStatus() == Status.DONE) {
+ // For pauseless ingestion, the segment is marked ONLINE before it's
built and before the COMMIT_END_METADATA
+ // call completes.
+ // The server should replace the segment only after the CRC is set by
COMMIT_END_METADATA and the segment is
+ // marked DONE.
+ // This ensures the segment's download URL is available before
discarding the locally built copy, preventing
+ // data loss if COMMIT_END_METADATA fails.
replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata,
indexLoadingConfig);
}
+ // Register the segment into the consumer coordinator if consumption order
is enforced.
+ if (_enforceConsumptionInOrder) {
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+ if (llcSegmentName != null) {
+
getConsumerCoordinator(llcSegmentName.getPartitionGroupId()).register(llcSegmentName);
+ }
+ }
}
@Override
@@ -516,11 +518,17 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
private void doAddConsumingSegment(String segmentName)
throws AttemptsExceededException, RetriableOperationException {
SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
- if ((!_enforceConsumptionInOrder) && ((zkMetadata == null) ||
(zkMetadata.getStatus().isCompleted()))) {
- // NOTE: We do not throw exception here because the segment might have
just been committed before the state
- // transition is processed. We can skip adding this segment, and
the segment will enter CONSUMING state in
- // Helix, then we can rely on the following CONSUMING -> ONLINE
state transition to add it.
- _logger.warn("Segment: {} is already consumed, skipping adding it as
CONSUMING segment", segmentName);
+ if (!_enforceConsumptionInOrder && zkMetadata.getStatus().isCompleted()) {
+ // NOTE:
+ // 1. When consumption order is enforced, we always create the
RealtimeSegmentDataManager to coordinate the
+ // consumption.
+ // 2. When segment is COMMITTING (for pauseless consumption), we still
create the RealtimeSegmentDataManager
+ // because there is no guarantee that the segment will be committed
soon. This way the slow server can still
+ // catch up.
+ // 3. We do not throw exception here because the segment might have just
been committed before the state
+ // transition is processed. We can skip adding this segment, and the
segment will enter CONSUMING state in
+ // Helix, then we can rely on the following CONSUMING -> ONLINE state
transition to add it.
+ _logger.warn("Segment: {} is already completed, skipping adding it as
CONSUMING segment", segmentName);
return;
}
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
@@ -557,22 +565,10 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
PartitionDedupMetadataManager partitionDedupMetadataManager =
_tableDedupMetadataManager != null ?
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
: null;
- RealtimeSegmentDataManager realtimeSegmentDataManager;
- try {
- realtimeSegmentDataManager =
- createRealtimeSegmentDataManager(zkMetadata, tableConfig,
indexLoadingConfig, schema, llcSegmentName,
- consumerCoordinator, partitionUpsertMetadataManager,
partitionDedupMetadataManager,
- _isTableReadyToConsumeData);
- } catch (SegmentAlreadyConsumedException e) {
- // Don't register segment.
- // If segment is not deleted, Eventually this server should receive a
CONSUMING -> ONLINE helix state transition.
- // If consumption in order is enforced:
- // 1. If segment was deleted: Helix thread waiting on this deleted
segment will fallback to fetch prev segment
- // from ideal state.
- // 2. If segment is not deleted, Helix thread waiting on this segment
will be notified and unblocked during
- // consuming -> online transition of this segment.
- return;
- }
+ RealtimeSegmentDataManager realtimeSegmentDataManager =
+ createRealtimeSegmentDataManager(zkMetadata, tableConfig,
indexLoadingConfig, schema, llcSegmentName,
+ consumerCoordinator, partitionUpsertMetadataManager,
partitionDedupMetadataManager,
+ _isTableReadyToConsumeData);
registerSegment(segmentName, realtimeSegmentDataManager,
partitionUpsertMetadataManager);
if (partitionUpsertMetadataManager != null) {
partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName);
@@ -821,21 +817,6 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
registerSegment(segmentName, segmentDataManager);
}
- @Override
- protected SegmentDataManager registerSegment(String segmentName,
SegmentDataManager segmentDataManager) {
- SegmentDataManager oldSegmentDataManager =
super.registerSegment(segmentName, segmentDataManager);
- if (_enforceConsumptionInOrder) {
- // helix threads might be waiting for their respective previous segments
to be loaded.
- // they need to be notified here.
- LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
- if (llcSegmentName != null) {
- ConsumerCoordinator consumerCoordinator =
getConsumerCoordinator(llcSegmentName.getPartitionGroupId());
- consumerCoordinator.trackSegment(llcSegmentName);
- }
- }
- return oldSegmentDataManager;
- }
-
/**
* Replaces the CONSUMING segment with a downloaded committed one.
*/
@@ -892,8 +873,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
@VisibleForTesting
- ConsumerCoordinator getConsumerCoordinator(int partitionGroupId) {
- return
_partitionGroupIdToConsumerCoordinatorMap.computeIfAbsent(partitionGroupId,
+ ConsumerCoordinator getConsumerCoordinator(int partitionId) {
+ return _partitionIdToConsumerCoordinatorMap.computeIfAbsent(partitionId,
k -> new ConsumerCoordinator(_enforceConsumptionInOrder, this));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyConsumedException.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyConsumedException.java
deleted file mode 100644
index 6b04920dff..0000000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyConsumedException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.realtime;
-
-public class SegmentAlreadyConsumedException extends RuntimeException {
-
- public SegmentAlreadyConsumedException(String currSegmentName) {
- super("Skipping consumption for segment: " + currSegmentName);
- }
-}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
index 1d8ec5bc0a..775ecffc3e 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
@@ -25,16 +25,15 @@ import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
-import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
+// TODO: Replace the sleep in this test to condition wait
public class ConsumerCoordinatorTest {
private static class FakeRealtimeTableDataManager extends
RealtimeTableDataManager {
@@ -98,15 +97,10 @@ public class ConsumerCoordinatorTest {
public Map<String, Map<String, String>> getSegmentAssignment() {
return _segmentAssignmentMap;
}
-
- @Override
- public boolean isSegmentAlreadyConsumed(String currSegmentName) {
- return false;
- }
}
@Test
- public void testAwaitForPreviousSegmentSequenceNumber()
+ public void testWaitForPreviousSegment()
throws InterruptedException {
// 1. enable tracking segment seq num.
FakeRealtimeTableDataManager realtimeTableDataManager = new
FakeRealtimeTableDataManager(null, false);
@@ -117,40 +111,24 @@ public class ConsumerCoordinatorTest {
// 2. check if thread waits on prev segment seq
AtomicBoolean atomicBoolean = new AtomicBoolean(false);
Thread thread1 = new Thread(() -> {
- LLCSegmentName llcSegmentName = getLLCSegment(101);
+ LLCSegmentName currentSegment = getLLCSegment(101);
try {
- boolean b =
consumerCoordinator.awaitForPreviousSegmentSequenceNumber(llcSegmentName, 5000);
- atomicBoolean.set(b);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ consumerCoordinator.waitForPreviousSegment(currentSegment, 100);
+ atomicBoolean.set(true);
+ } catch (Exception e) {
+ Assert.fail();
}
});
thread1.start();
+ Thread.sleep(1000);
+ Assert.assertFalse(atomicBoolean.get());
// 3. add prev segment and check if thread is unblocked.
- consumerCoordinator.trackSegment(getLLCSegment(100));
+ consumerCoordinator.register(getLLCSegment(100));
- TestUtils.waitForCondition(aVoid -> atomicBoolean.get(), 4000,
+ TestUtils.waitForCondition(aVoid -> atomicBoolean.get(), 5000,
"Thread waiting on previous segment should have been unblocked.");
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 100);
-
- // 4. check if second thread waits on prev segment seq until timeout and
returns false
- AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
- Thread thread2 = new Thread(() -> {
- LLCSegmentName llcSegmentName = getLLCSegment(102);
- try {
- boolean b =
consumerCoordinator.awaitForPreviousSegmentSequenceNumber(llcSegmentName, 500);
- atomicBoolean2.set(b);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- });
- thread2.start();
-
- Thread.sleep(1500);
-
- Assert.assertFalse(atomicBoolean2.get());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 100);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
100);
}
@Test
@@ -185,15 +163,15 @@ public class ConsumerCoordinatorTest {
Thread.sleep(1000);
// 3. load segment 100, 101, 102
- realtimeTableDataManager.registerSegment(getSegmentName(100),
mockedRealtimeSegmentDataManager);
- realtimeTableDataManager.registerSegment(getSegmentName(101),
mockedRealtimeSegmentDataManager);
- realtimeTableDataManager.registerSegment(getSegmentName(102),
mockedRealtimeSegmentDataManager);
+ consumerCoordinator.register(getLLCSegment(100));
+ consumerCoordinator.register(getLLCSegment(101));
+ consumerCoordinator.register(getLLCSegment(102));
Thread.sleep(1000);
// 4. check all of the above threads wait
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
1);
Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
102);
Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get());
thread2.start();
@@ -205,27 +183,27 @@ public class ConsumerCoordinatorTest {
// 5. check that first thread acquiring semaphore is of segment 104
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
102);
Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
- realtimeTableDataManager.registerSegment(getSegmentName(104),
mockedRealtimeSegmentDataManager);
+ consumerCoordinator.register(getLLCSegment(104));
Thread.sleep(1000);
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 104);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
104);
Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(),
1);
// 6. check the next threads acquiring semaphore is 106
consumerCoordinator.getSemaphore().release();
- realtimeTableDataManager.registerSegment(getSegmentName(106),
mockedRealtimeSegmentDataManager);
+ consumerCoordinator.register(getLLCSegment(106));
Thread.sleep(1000);
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 106);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
106);
Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(),
1);
}
@@ -249,26 +227,24 @@ public class ConsumerCoordinatorTest {
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
1);
Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
-1);
Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get());
- RealtimeSegmentDataManager mockedRealtimeSegmentDataManager =
getMockedRealtimeSegmentDataManager();
-
// 3. register older segment and check seq num watermark and semaphore.
- realtimeTableDataManager.registerSegment(getSegmentName(90),
mockedRealtimeSegmentDataManager);
+ consumerCoordinator.register(getLLCSegment(90));
Thread.sleep(1000);
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
1);
Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 90);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
90);
Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get());
// 4. register prev segment and check watermark and if thread was unblocked
- realtimeTableDataManager.registerSegment(getSegmentName(91),
mockedRealtimeSegmentDataManager);
+ consumerCoordinator.register(getLLCSegment(91));
Thread.sleep(1000);
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
91);
Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
// 5. check that all the following transitions rely on seq num watermark
and gets blocked.
@@ -283,7 +259,7 @@ public class ConsumerCoordinatorTest {
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
91);
Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
// 6. check that all above threads are still blocked even if semaphore is
released.
@@ -293,25 +269,25 @@ public class ConsumerCoordinatorTest {
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
1);
Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
91);
Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
// 6. mark 101 seg as complete. Check 102 acquired the semaphore.
- realtimeTableDataManager.registerSegment(getSegmentName(101),
mockedRealtimeSegmentDataManager);
+ consumerCoordinator.register(getLLCSegment(101));
Thread.sleep(1000);
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 101);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
101);
Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(),
0);
// 7. register 102 seg, check if seg 103 is waiting on semaphore.
- realtimeTableDataManager.registerSegment(getSegmentName(102),
mockedRealtimeSegmentDataManager);
+ consumerCoordinator.register(getLLCSegment(102));
Thread.sleep(1000);
Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
102);
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(),
1);
@@ -320,16 +296,16 @@ public class ConsumerCoordinatorTest {
consumerCoordinator.getSemaphore().release();
Thread.sleep(1000);
Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
102);
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(),
0);
// 8. register 103 seg and check if seg 104 is now queued on semaphore
- realtimeTableDataManager.registerSegment(getSegmentName(103),
mockedRealtimeSegmentDataManager);
+ consumerCoordinator.register(getLLCSegment(103));
Thread.sleep(1000);
Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 103);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
103);
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(),
1);
}
@@ -354,14 +330,14 @@ public class ConsumerCoordinatorTest {
Assert.assertNotNull(realtimeTableDataManager);
// prev segment has seq 91, so registering seq 90 won't do anything.
- realtimeTableDataManager.registerSegment(getSegmentName(90),
mockedRealtimeSegmentDataManager);
+ consumerCoordinator.register(getLLCSegment(90));
Thread.sleep(2000);
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
1);
// 2. test that registering prev segment will unblock thread.
- realtimeTableDataManager.registerSegment(getSegmentName(91),
mockedRealtimeSegmentDataManager);
+ consumerCoordinator.register(getLLCSegment(91));
TestUtils.waitForCondition(aVoid ->
(consumerCoordinator.getSemaphore().availablePermits() == 0), 5000,
"Semaphore must be acquired after registering previous segment");
@@ -372,8 +348,8 @@ public class ConsumerCoordinatorTest {
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
1);
Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads());
Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1);
- realtimeTableDataManager.registerSegment(getSegmentName(101),
mockedRealtimeSegmentDataManager);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
91);
+ consumerCoordinator.register(getLLCSegment(101));
// 3. test that segment 103 will be blocked.
Map<String, String> serverSegmentStatusMap = new HashMap<>() {{
@@ -401,7 +377,7 @@ public class ConsumerCoordinatorTest {
Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads());
// 4. registering seg 102 should unblock seg 103
- realtimeTableDataManager.registerSegment(getSegmentName(102),
mockedRealtimeSegmentDataManager);
+ consumerCoordinator.register(getLLCSegment(102));
Thread.sleep(1000);
@@ -416,12 +392,12 @@ public class ConsumerCoordinatorTest {
Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(),
0);
Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(),
0);
Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
- Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1);
+ Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(),
102);
}
@Test
public void testRandomOrder()
- throws InterruptedException {
+ throws Exception {
RealtimeTableDataManager realtimeTableDataManager =
Mockito.mock(RealtimeTableDataManager.class);
Mockito.when(realtimeTableDataManager.getTableName()).thenReturn("tableTest_REALTIME");
@@ -450,51 +426,25 @@ public class ConsumerCoordinatorTest {
String segmentName = "tableTest_REALTIME__1__101__20250304T0035Z";
LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
Assert.assertNotNull(llcSegmentName);
- String previousSegment =
consumerCoordinator.getPreviousSegmentFromIdealState(llcSegmentName);
- Assert.assertEquals(previousSegment,
"tableTest_REALTIME__1__91__20250304T0035Z");
+ int previousSegmentSequenceNumber =
+
consumerCoordinator.getPreviousSegmentSequenceNumberFromIdealState(llcSegmentName);
+ Assert.assertEquals(previousSegmentSequenceNumber, 91);
consumerCoordinator.getSegmentAssignment().clear();
Map<String, String> serverSegmentStatusMap = new HashMap<>() {{
put("server_3", "ONLINE");
}};
consumerCoordinator.getSegmentAssignment().put(getSegmentName(100),
serverSegmentStatusMap);
- previousSegment =
consumerCoordinator.getPreviousSegmentFromIdealState(llcSegmentName);
- Assert.assertNull(previousSegment);
- }
-
- @Test
- public void testIfSegmentIsConsumed() {
- RealtimeTableDataManager realtimeTableDataManager =
Mockito.mock(RealtimeTableDataManager.class);
-
Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(null);
-
- ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(true,
realtimeTableDataManager);
-
Assert.assertTrue(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101)));
-
- SegmentZKMetadata mockSegmentZKMetadata =
Mockito.mock(SegmentZKMetadata.class);
-
-
Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
-
Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata);
-
Assert.assertFalse(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101)));
-
-
Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.COMMITTING);
-
Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata);
-
Assert.assertFalse(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101)));
-
-
Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.DONE);
-
Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata);
-
Assert.assertTrue(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101)));
-
-
Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.UPLOADED);
-
Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata);
-
Assert.assertTrue(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101)));
+ previousSegmentSequenceNumber =
consumerCoordinator.getPreviousSegmentSequenceNumberFromIdealState(llcSegmentName);
+ Assert.assertEquals(previousSegmentSequenceNumber, -1);
}
private Thread getNewThread(FakeConsumerCoordinator consumerCoordinator,
LLCSegmentName llcSegmentName) {
return new Thread(() -> {
try {
consumerCoordinator.acquire(llcSegmentName);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ Assert.fail();
}
}, String.valueOf(llcSegmentName.getSequenceNumber()));
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index a332818164..8bf32c576b 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -28,10 +28,8 @@ import java.time.Instant;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
import javax.annotation.Nullable;
@@ -70,7 +68,6 @@ import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -750,58 +747,6 @@ public class RealtimeSegmentDataManagerTest {
segmentDataManager.close();
}
- @Test
- public void testOnlyOneSegmentHoldingTheSemaphoreForParticularPartition()
- throws Exception {
- long timeout = 10_000L;
- FakeRealtimeSegmentDataManager firstSegmentDataManager =
createFakeSegmentManager();
-
Assert.assertTrue(firstSegmentDataManager.getConsumerSemaphoreAcquired().get());
- Semaphore firstSemaphore =
firstSegmentDataManager.getPartitionGroupConsumerSemaphore();
- Assert.assertEquals(firstSemaphore.availablePermits(), 0);
- Assert.assertFalse(firstSemaphore.hasQueuedThreads());
-
- AtomicReference<FakeRealtimeSegmentDataManager> secondSegmentDataManager =
new AtomicReference<>(null);
-
- // Construct the second segment manager, which will be blocked on the
semaphore.
- Thread constructSecondSegmentManager = new Thread(() -> {
- try {
- secondSegmentDataManager.set(createFakeSegmentManager());
- } catch (Exception e) {
- throw new RuntimeException("Exception when sleeping for " + timeout +
"ms", e);
- }
- });
- constructSecondSegmentManager.start();
-
- // Wait until the second segment manager gets blocked on the semaphore.
- TestUtils.waitForCondition(aVoid -> {
- if (firstSemaphore.hasQueuedThreads()) {
- // Once verified the second segment gets blocked, release the
semaphore.
- firstSegmentDataManager.close();
- return true;
- } else {
- return false;
- }
- }, timeout, "Failed to wait for the second segment blocked on semaphore");
-
- // Wait for the second segment manager finished the construction.
- TestUtils.waitForCondition(aVoid -> secondSegmentDataManager.get() !=
null, timeout,
- "Failed to acquire the semaphore for the second segment manager in " +
timeout + "ms");
-
-
Assert.assertTrue(secondSegmentDataManager.get().getConsumerSemaphoreAcquired().get());
- Semaphore secondSemaphore =
secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore();
- Assert.assertEquals(firstSemaphore, secondSemaphore);
- Assert.assertEquals(secondSemaphore.availablePermits(), 0);
- Assert.assertFalse(secondSemaphore.hasQueuedThreads());
-
- // Call offload method the 2nd time on the first segment manager, the
permits in semaphore won't increase.
- firstSegmentDataManager.close();
-
Assert.assertEquals(firstSegmentDataManager.getPartitionGroupConsumerSemaphore().availablePermits(),
0);
-
- // The permit finally gets released in the Semaphore.
- secondSegmentDataManager.get().close();
-
Assert.assertEquals(secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore().availablePermits(),
1);
- }
-
@Test
public void
testShutdownTableDataManagerWillNotShutdownLeaseExtenderExecutor()
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]