This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3aeae40e01 Ensure stream consumer is only closed once (#15372)
3aeae40e01 is described below
commit 3aeae40e01e4a5492cf905f3400c8d04174da000
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Mar 27 07:26:17 2025 -0600
Ensure stream consumer is only closed once (#15372)
---
.../realtime/RealtimeSegmentDataManager.java | 38 ++++++++++++----------
.../realtime/RealtimeSegmentDataManagerTest.java | 4 +--
2 files changed, 23 insertions(+), 19 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 9011318891..730055a89c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -244,6 +244,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private final int _segmentMaxRowCount;
private final String _resourceDataDir;
private final Schema _schema;
+ private final AtomicBoolean _streamConsumerClosed = new AtomicBoolean(false);
// Semaphore for each partitionGroupId only, which is to prevent two
different stream consumers
// from consuming with the same partitionGroupId in parallel in the same
host.
// See the comments in {@link RealtimeTableDataManager}.
@@ -252,7 +253,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// This boolean is needed because the semaphore is shared by threads; every
thread holding this semaphore can
// modify the permit. This boolean make sure the semaphore gets released
only once when the partition group stops
// consuming.
- private final AtomicBoolean _acquiredConsumerSemaphore;
+ private final AtomicBoolean _consumerSemaphoreAcquired = new
AtomicBoolean(false);
private final ServerMetrics _serverMetrics;
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
@@ -1070,14 +1071,14 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
}
@VisibleForTesting
- AtomicBoolean getAcquiredConsumerSemaphore() {
- return _acquiredConsumerSemaphore;
+ AtomicBoolean getConsumerSemaphoreAcquired() {
+ return _consumerSemaphoreAcquired;
}
@VisibleForTesting
protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
if (_parallelSegmentConsumptionPolicy.isAllowedDuringBuild()) {
- closeStreamConsumers();
+ closeStreamConsumer();
}
// Do not allow building segment when table data manager is already shut
down
if (_realtimeTableDataManager.isShutDown()) {
@@ -1276,12 +1277,11 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
return true;
}
- private void closeStreamConsumers() {
- closePartitionGroupConsumer();
- closePartitionMetadataProvider();
- if (_acquiredConsumerSemaphore.compareAndSet(true, false)) {
- _segmentLogger.info("Releasing the consumer semaphore");
- _consumerCoordinator.release();
+ private void closeStreamConsumer() {
+ if (_streamConsumerClosed.compareAndSet(false, true)) {
+ closePartitionGroupConsumer();
+ closePartitionMetadataProvider();
+ releaseConsumerSemaphore();
}
}
@@ -1303,6 +1303,13 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
}
}
+ private void releaseConsumerSemaphore() {
+ if (_consumerSemaphoreAcquired.compareAndSet(true, false)) {
+ _segmentLogger.info("Releasing consumer semaphore");
+ _consumerCoordinator.release();
+ }
+ }
+
/**
* Cleans up the metrics that reflects the state of the realtime segment.
* This step is essential as the instance may not be the target location for
some of the partitions.
@@ -1461,7 +1468,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
protected void downloadSegmentAndReplace(SegmentZKMetadata segmentZKMetadata)
throws Exception {
if (_parallelSegmentConsumptionPolicy.isAllowedDuringDownload()) {
- closeStreamConsumers();
+ closeStreamConsumer();
}
_realtimeTableDataManager.downloadAndReplaceConsumingSegment(segmentZKMetadata);
}
@@ -1500,7 +1507,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
} catch (Exception e) {
_segmentLogger.error("Caught exception while stopping the consumer
thread", e);
}
- closeStreamConsumers();
+ closeStreamConsumer();
cleanupMetrics();
_realtimeSegment.offload();
}
@@ -1586,7 +1593,6 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
:
_streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getEndOffset()),
_segmentZKMetadata.getStatus().toString());
_consumerCoordinator = consumerCoordinator;
- _acquiredConsumerSemaphore = new AtomicBoolean(false);
InstanceDataManagerConfig instanceDataManagerConfig =
indexLoadingConfig.getInstanceDataManagerConfig();
String clientIdSuffix =
instanceDataManagerConfig != null ?
instanceDataManagerConfig.getConsumerClientIdSuffix() : null;
@@ -1680,7 +1686,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// Acquire semaphore to create stream consumers
try {
_consumerCoordinator.acquire(llcSegmentName);
- _acquiredConsumerSemaphore.set(true);
+ _consumerSemaphoreAcquired.set(true);
} catch (InterruptedException e) {
String errorMsg = "InterruptedException when acquiring the
partitionConsumerSemaphore";
_segmentLogger.error(errorMsg);
@@ -1710,9 +1716,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// In case of exception thrown here, segment goes to ERROR state. Then
any attempt to reset the segment from
// ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the
semaphore is acquired, but not released.
// Hence releasing the semaphore here to unblock reset operation via
Helix Admin.
- _segmentLogger.info("Releasing the consumer semaphore");
- _consumerCoordinator.release();
- _acquiredConsumerSemaphore.set(false);
+ releaseConsumerSemaphore();
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(),
"Failed to initialize segment data manager", t));
_segmentLogger.warn(
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 72dbf26acb..a332818164 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -755,7 +755,7 @@ public class RealtimeSegmentDataManagerTest {
throws Exception {
long timeout = 10_000L;
FakeRealtimeSegmentDataManager firstSegmentDataManager =
createFakeSegmentManager();
-
Assert.assertTrue(firstSegmentDataManager.getAcquiredConsumerSemaphore().get());
+
Assert.assertTrue(firstSegmentDataManager.getConsumerSemaphoreAcquired().get());
Semaphore firstSemaphore =
firstSegmentDataManager.getPartitionGroupConsumerSemaphore();
Assert.assertEquals(firstSemaphore.availablePermits(), 0);
Assert.assertFalse(firstSemaphore.hasQueuedThreads());
@@ -787,7 +787,7 @@ public class RealtimeSegmentDataManagerTest {
TestUtils.waitForCondition(aVoid -> secondSegmentDataManager.get() !=
null, timeout,
"Failed to acquire the semaphore for the second segment manager in " +
timeout + "ms");
-
Assert.assertTrue(secondSegmentDataManager.get().getAcquiredConsumerSemaphore().get());
+
Assert.assertTrue(secondSegmentDataManager.get().getConsumerSemaphoreAcquired().get());
Semaphore secondSemaphore =
secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore();
Assert.assertEquals(firstSemaphore, secondSemaphore);
Assert.assertEquals(secondSemaphore.availablePermits(), 0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]