This is an automated email from the ASF dual-hosted git repository.
KKcorps pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d0ead2f626c Skip local build on CRC mismatch during CONSUMING->ONLINE
transition (#18895)
d0ead2f626c is described below
commit d0ead2f626c27034b9b0316721c96ac5c5367ddb
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Thu Jul 2 20:13:30 2026 -0700
Skip local build on CRC mismatch during CONSUMING->ONLINE transition
(#18895)
---
.../realtime/RealtimeSegmentDataManager.java | 48 +++++-
.../realtime/RealtimeSegmentDataManagerTest.java | 185 ++++++++++++++++++++-
2 files changed, 229 insertions(+), 4 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 0fbe75597d4..b060f2448a5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -52,6 +52,7 @@ import
org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.core.data.manager.BaseTableDataManager;
import org.apache.pinot.core.data.manager.SegmentOperationsTaskContext;
import org.apache.pinot.core.data.manager.SegmentOperationsTaskType;
import
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter;
@@ -72,6 +73,7 @@ import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
@@ -1396,6 +1398,21 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
protected boolean buildSegmentAndReplace()
throws Exception {
+ return buildSegmentAndReplace(null);
+ }
+
+ /// Builds the segment from the in-memory rows and replaces the CONSUMING
segment with the local copy.
+ ///
+ /// A locally-built segment can diverge in CRC from the one the winning
replica committed (e.g. different docId
+ /// ordering); swapping in a divergent copy corrupts upsert metadata and
leaves replicas inconsistent otherwise. So
+ /// when `committedSegmentZKMetadata` carries a CRC, a mismatch discards the
local build (returns `false`) and the
+ /// caller downloads the committed segment. Skipped for pauseless tables and
segments whose CRC is not yet set (the
+ /// COMMITTING window), so we never download a not-yet-uploaded segment (see
PR #17885).
+ ///
+ /// @param committedSegmentZKMetadata committed ZK metadata carrying the
CRC, or `null` to skip the check
+ /// @return `true` if built and replaced locally; `false` if the build
failed or was rejected on a CRC mismatch
+ protected boolean buildSegmentAndReplace(@Nullable SegmentZKMetadata
committedSegmentZKMetadata)
+ throws Exception {
SegmentBuildDescriptor descriptor;
try {
descriptor = buildSegmentInternal(false);
@@ -1405,10 +1422,34 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
if (descriptor == null) {
return false;
}
- _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr,
_segmentZKMetadata);
+ boolean crcCheckEnabled = committedSegmentZKMetadata != null &&
committedSegmentZKMetadata.getCrc() >= 0
+ && !PauselessConsumptionUtils.isPauselessEnabled(_tableConfig);
+ if (crcCheckEnabled &&
!isLocalSegmentCrcMatchingZk(committedSegmentZKMetadata)) {
+ _segmentLogger.warn("Locally-built segment: {} CRC does not match
committed CRC: {} in zk. "
+ + "Skipping local build to replace", _segmentNameStr,
committedSegmentZKMetadata.getCrc());
+ return false;
+ }
+ // On a CRC match use the committed metadata; otherwise the
construction-time metadata.
+ _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr,
+ crcCheckEnabled ? committedSegmentZKMetadata : _segmentZKMetadata);
return true;
}
+ /// Whether the just-built local segment's CRC matches the committed CRC in
ZK. Returns `false` if the local metadata
+ /// cannot be read, so the caller downloads the committed segment rather
than failing the transition.
+ @VisibleForTesting
+ protected boolean isLocalSegmentCrcMatchingZk(SegmentZKMetadata
committedSegmentZKMetadata) {
+ File indexDir = new File(_resourceDataDir, _segmentNameStr);
+ try {
+ SegmentMetadataImpl localMetadata = new SegmentMetadataImpl(indexDir);
+ return BaseTableDataManager.hasSameCRC(committedSegmentZKMetadata,
localMetadata);
+ } catch (Exception e) {
+ _segmentLogger.warn("Failed to read CRC of locally-built segment: {};
treating as CRC mismatch to download the "
+ + "committed segment", _segmentNameStr, e);
+ return false;
+ }
+ }
+
private void closeStreamConsumer() {
if (_streamConsumerClosed.compareAndSet(false, true)) {
closePartitionGroupConsumer();
@@ -1609,7 +1650,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
} else if (_currentOffset.compareTo(endOffset) == 0) {
_segmentLogger.info("Current offset {} matches offset in zk {}.
Replacing segment", _currentOffset,
endOffset);
- if (!buildSegmentAndReplace()) {
+ if (!buildSegmentAndReplace(segmentZKMetadata)) {
_segmentLogger.warn("Failed to build the segment: {} and
replace. Downloading to replace",
_segmentNameStr);
downloadSegmentAndReplace(segmentZKMetadata);
@@ -1630,7 +1671,8 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
if (success) {
_segmentLogger.info("Caught up to offset {}", _currentOffset);
- if (!buildSegmentAndReplace()) {
+ // After catching up the offset matches zk; apply the same CRC
guard as the matched-offset case.
+ if (!buildSegmentAndReplace(segmentZKMetadata)) {
_segmentLogger.warn("Failed to build the segment: {} after
catchup. Downloading to replace",
_segmentNameStr);
downloadSegmentAndReplace(segmentZKMetadata);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 41fb745acd4..92a6dad4763 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -26,6 +26,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Instant;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -48,13 +49,19 @@ import
org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.segment.creator.Fixtures;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.stream.LongMsgOffset;
@@ -70,6 +77,9 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
@@ -600,6 +610,159 @@ public class RealtimeSegmentDataManagerTest {
}
}
+ @Test
+ public void testOnlineTransitionSkipsLocalBuildOnCrcMismatch()
+ throws Exception {
+ long finalOffsetValue = START_OFFSET_VALUE + 600;
+
+ // Upsert + committed CRC set + local CRC mismatch -> discard local build,
download the committed segment.
+ try (FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager()) {
+
when(segmentDataManager._tableDataManager.isUpsertEnabled()).thenReturn(true);
+ runGoOnlineForCrcGuard(segmentDataManager, crcMetadata(finalOffsetValue,
12345L), false, finalOffsetValue);
+ Assert.assertTrue(segmentDataManager._buildAndReplaceCalled);
+ Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled);
+ }
+
+ // Non-upsert + committed CRC set + local CRC mismatch -> discard local
build, download the committed segment.
+ try (FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager()) {
+
when(segmentDataManager._tableDataManager.isUpsertEnabled()).thenReturn(false);
+ runGoOnlineForCrcGuard(segmentDataManager, crcMetadata(finalOffsetValue,
12345L), false, finalOffsetValue);
+ Assert.assertTrue(segmentDataManager._buildAndReplaceCalled);
+ Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled);
+ }
+
+ // Upsert + committed CRC set + local CRC match -> build locally, no
download, swap in committed metadata.
+ try (FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager()) {
+
when(segmentDataManager._tableDataManager.isUpsertEnabled()).thenReturn(true);
+ SegmentZKMetadata committedMetadata = crcMetadata(finalOffsetValue,
12345L);
+ runGoOnlineForCrcGuard(segmentDataManager, committedMetadata, true,
finalOffsetValue);
+ Assert.assertTrue(segmentDataManager._buildAndReplaceCalled);
+ Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
+ verify(segmentDataManager._tableDataManager)
+ .replaceConsumingSegment(eq(SEGMENT_NAME_STR),
same(committedMetadata));
+ }
+
+ // Non-upsert + committed CRC set + local CRC match -> build locally, no
download, swap in committed metadata.
+ try (FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager()) {
+
when(segmentDataManager._tableDataManager.isUpsertEnabled()).thenReturn(false);
+ SegmentZKMetadata committedMetadata = crcMetadata(finalOffsetValue,
12345L);
+ runGoOnlineForCrcGuard(segmentDataManager, committedMetadata, true,
finalOffsetValue);
+ Assert.assertTrue(segmentDataManager._buildAndReplaceCalled);
+ Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
+ verify(segmentDataManager._tableDataManager)
+ .replaceConsumingSegment(eq(SEGMENT_NAME_STR),
same(committedMetadata));
+ }
+
+ // Committed CRC unset (-1) -> guard skipped, build locally with
construction-time metadata.
+ try (FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager()) {
+
when(segmentDataManager._tableDataManager.isUpsertEnabled()).thenReturn(false);
+ SegmentZKMetadata committedMetadata = crcMetadata(finalOffsetValue, -1L);
+ runGoOnlineForCrcGuard(segmentDataManager, committedMetadata, false,
finalOffsetValue);
+ Assert.assertTrue(segmentDataManager._buildAndReplaceCalled);
+ Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
+ verify(segmentDataManager._tableDataManager)
+ .replaceConsumingSegment(eq(SEGMENT_NAME_STR), argThat(m -> m !=
committedMetadata));
+ }
+
+ // Pauseless table -> guard skipped, build locally with construction-time
metadata.
+ try (FakeRealtimeSegmentDataManager segmentDataManager =
+ createFakeSegmentManager(false, new TimeSupplier(), null, null,
createPauselessTableConfig())) {
+
when(segmentDataManager._tableDataManager.isUpsertEnabled()).thenReturn(false);
+ SegmentZKMetadata committedMetadata = crcMetadata(finalOffsetValue,
12345L);
+ runGoOnlineForCrcGuard(segmentDataManager, committedMetadata, false,
finalOffsetValue);
+ Assert.assertTrue(segmentDataManager._buildAndReplaceCalled);
+ Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
+ verify(segmentDataManager._tableDataManager)
+ .replaceConsumingSegment(eq(SEGMENT_NAME_STR), argThat(m -> m !=
committedMetadata));
+ }
+ }
+
+ private void runGoOnlineForCrcGuard(FakeRealtimeSegmentDataManager
segmentDataManager, SegmentZKMetadata metadata,
+ boolean localCrcMatchesZk, long currentOffsetValue)
+ throws Exception {
+ segmentDataManager._useRealBuildAndReplace = true;
+ segmentDataManager._localSegmentCrcMatchesZk = localCrcMatchesZk;
+ segmentDataManager.getConsumerSemaphoreAcquired().set(true);
+ segmentDataManager._stopWaitTimeMs = 0;
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.HOLDING);
+ segmentDataManager.setCurrentOffset(currentOffsetValue);
+ segmentDataManager.goOnlineFromConsuming(metadata);
+ }
+
+ private SegmentZKMetadata crcMetadata(long endOffsetValue, long crc) {
+ SegmentZKMetadata metadata = new SegmentZKMetadata(SEGMENT_NAME_STR);
+ metadata.setEndOffset(new LongMsgOffset(endOffsetValue).toString());
+ if (crc >= 0) {
+ metadata.setCrc(crc);
+ }
+ return metadata;
+ }
+
+ private TableConfig createPauselessTableConfig()
+ throws Exception {
+ TableConfig tableConfig = createTableConfig();
+ StreamIngestionConfig streamIngestionConfig =
+ new
StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs()));
+ streamIngestionConfig.setPauselessConsumptionEnabled(true);
+ IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+ if (ingestionConfig == null) {
+ ingestionConfig = new IngestionConfig();
+ tableConfig.setIngestionConfig(ingestionConfig);
+ }
+ ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
+ return tableConfig;
+ }
+
+ // Exercises the real CRC comparison (on-disk SegmentMetadataImpl read +
hasSameCRC) rather than the stubbed branch.
+ @Test
+ public void testIsLocalSegmentCrcMatchingZk()
+ throws Exception {
+ try (FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager()) {
+ segmentDataManager._useRealCrcCheck = true;
+ File resourceDir = new File(TEMP_DIR, REALTIME_TABLE_NAME);
+ long localCrc = buildRealSegmentAndGetCrc(resourceDir, SEGMENT_NAME_STR);
+
+ SegmentZKMetadata matchingMetadata = new
SegmentZKMetadata(SEGMENT_NAME_STR);
+ matchingMetadata.setCrc(localCrc);
+
Assert.assertTrue(segmentDataManager.isLocalSegmentCrcMatchingZk(matchingMetadata));
+
+ SegmentZKMetadata mismatchingMetadata = new
SegmentZKMetadata(SEGMENT_NAME_STR);
+ mismatchingMetadata.setCrc(localCrc + 1);
+
Assert.assertFalse(segmentDataManager.isLocalSegmentCrcMatchingZk(mismatchingMetadata));
+ }
+ }
+
+ // When the locally-built segment cannot be read (e.g. missing/corrupt), the
CRC check must report a mismatch so the
+ // ONLINE transition downloads the committed segment instead of throwing.
+ @Test
+ public void
testIsLocalSegmentCrcMatchingZkTreatsUnreadableSegmentAsMismatch()
+ throws Exception {
+ try (FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager()) {
+ segmentDataManager._useRealCrcCheck = true;
+ // No segment was built under the resource dir, so the metadata read
fails.
+ FileUtils.deleteQuietly(new File(new File(TEMP_DIR,
REALTIME_TABLE_NAME), SEGMENT_NAME_STR));
+ SegmentZKMetadata metadata = new SegmentZKMetadata(SEGMENT_NAME_STR);
+ metadata.setCrc(12345L);
+
Assert.assertFalse(segmentDataManager.isLocalSegmentCrcMatchingZk(metadata));
+ }
+ }
+
+ private long buildRealSegmentAndGetCrc(File resourceDir, String segmentName)
+ throws Exception {
+ Schema schema = Fixtures.createSchema();
+ TableConfig tableConfig = createTableConfig();
+ SegmentGeneratorConfig generatorConfig = new
SegmentGeneratorConfig(tableConfig, schema);
+ generatorConfig.setOutDir(resourceDir.getAbsolutePath());
+ generatorConfig.setSegmentName(segmentName);
+ List<GenericRow> rows = List.of(Fixtures.createSingleRow(1L),
Fixtures.createSingleRow(2L));
+ try (GenericRowRecordReader recordReader = new
GenericRowRecordReader(rows)) {
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(generatorConfig, recordReader);
+ driver.build();
+ }
+ return Long.parseLong(new SegmentMetadataImpl(new File(resourceDir,
segmentName)).getCrc());
+ }
+
@Test
public void testEndCriteriaChecking()
throws Exception {
@@ -1166,6 +1329,12 @@ public class RealtimeSegmentDataManagerTest {
public Field _stopReason;
public Field _segmentBuildFailedWithDeterministicError;
public boolean _failSegmentBuildAndReplace = false;
+ // When set, buildSegmentAndReplace runs the real implementation
(including the upsert CRC guard) instead of being
+ // short-circuited, and isLocalSegmentCrcMatchingZk returns
_localSegmentCrcMatchesZk.
+ public boolean _useRealBuildAndReplace = false;
+ public boolean _localSegmentCrcMatchesZk = true;
+ // When set, isLocalSegmentCrcMatchingZk runs the real on-disk metadata
read + CRC comparison.
+ public boolean _useRealCrcCheck = false;
private Field _streamMsgOffsetFactory;
public LinkedList<LongMsgOffset> _consumeOffsets = new LinkedList<>();
public LinkedList<SegmentCompletionProtocol.Response> _responses = new
LinkedList<>();
@@ -1180,6 +1349,7 @@ public class RealtimeSegmentDataManagerTest {
public boolean _postConsumeStoppedCalled = false;
public Map<Integer, ConsumerCoordinator> _consumerCoordinatorMap;
public boolean _stubConsumeLoop = true;
+ public RealtimeTableDataManager _tableDataManager;
private TimeSupplier _timeSupplier;
private boolean _indexCapacityThresholdBreached;
@@ -1216,6 +1386,7 @@ public class RealtimeSegmentDataManagerTest {
_streamMsgOffsetFactory.setAccessible(true);
_streamMsgOffsetFactory.set(this, new LongMsgOffsetFactory());
_timeSupplier = timeSupplier;
+ _tableDataManager = realtimeTableDataManager;
}
public String getStopReason() {
@@ -1317,12 +1488,24 @@ public class RealtimeSegmentDataManagerTest {
}
@Override
- protected boolean buildSegmentAndReplace() {
+ protected boolean buildSegmentAndReplace(SegmentZKMetadata
committedSegmentZKMetadata)
+ throws Exception {
terminateLoopIfNecessary();
_buildAndReplaceCalled = true;
+ if (_useRealBuildAndReplace) {
+ return super.buildSegmentAndReplace(committedSegmentZKMetadata);
+ }
return !_failSegmentBuildAndReplace;
}
+ @Override
+ protected boolean isLocalSegmentCrcMatchingZk(SegmentZKMetadata
committedSegmentZKMetadata) {
+ if (_useRealCrcCheck) {
+ return super.isLocalSegmentCrcMatchingZk(committedSegmentZKMetadata);
+ }
+ return _localSegmentCrcMatchesZk;
+ }
+
@Override
protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
terminateLoopIfNecessary();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]