This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d54ec21ad56 Add check to not enable dropOutOfOrderRecord or
outOfOrderColumn when consistencyMode != NONE (#17550)
d54ec21ad56 is described below
commit d54ec21ad56a7885cb66ff3dc6d05ddcae2dd06b
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Thu Jan 22 20:20:19 2026 -0800
Add check to not enable dropOutOfOrderRecord or outOfOrderColumn when
consistencyMode != NONE (#17550)
---
.../realtime/RealtimeSegmentDataManager.java | 4 +-
.../indexsegment/mutable/MutableSegmentImpl.java | 5 +
.../upsert/BasePartitionUpsertMetadataManager.java | 19 ++--
.../pinot/segment/local/upsert/UpsertContext.java | 22 +++-
.../segment/local/utils/TableConfigUtils.java | 12 +++
.../segment/local/utils/TableConfigUtilsTest.java | 115 +++++++++++++++++++++
6 files changed, 165 insertions(+), 12 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 788c1b14bc5..5f4fdc22138 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1520,8 +1520,8 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
if (_realtimeTableDataManager.isUpsertEnabled()
&& _realtimeTableDataManager.getTableUpsertMetadataManager()
!= null) {
UpsertContext context =
_realtimeTableDataManager.getTableUpsertMetadataManager().getContext();
- if (_realtimeTableDataManager.isPartialUpsertEnabled() ||
(context.isDropOutOfOrderRecord()
- && context.getConsistencyMode() ==
UpsertConfig.ConsistencyMode.NONE)) {
+ if (_realtimeTableDataManager.isPartialUpsertEnabled() ||
context.isDropOutOfOrderRecord()
+ || context.getOutOfOrderRecordColumn() != null) {
_serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.REALTIME_ROWS_AHEAD_OF_ZK, 1L);
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index c13a17ec430..2e8d9bb4bf5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -640,6 +640,11 @@ public class MutableSegmentImpl implements MutableSegment {
if (isUpsertEnabled()) {
RecordInfo recordInfo = getRecordInfo(row, numDocsIndexed);
GenericRow updatedRow =
_partitionUpsertMetadataManager.updateRecord(row, recordInfo);
+ // NOTE: out-of-order records can not be dropped or marked when
consistent upsert view is enabled.
+ // Since Indexing the record and updation of _numDocsIndexed counter
happens before updating the upsert
+ // metadata, we wouldn't be able to actually drop or mark those records
as dropped. This order is important for
+ // consistent upsert view, otherwise the latest doc can be missed by
query due to 'docId < _numDocs' check
+ // in query filter operators. Here the record becomes queryable before
validDocIds bitmaps are updated.
if (_upsertConsistencyMode != UpsertConfig.ConsistencyMode.NONE) {
updateDictionary(updatedRow);
addNewRow(numDocsIndexed, updatedRow);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 1c64c6ee3b9..ab414dd6ede 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -675,15 +675,16 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
MutableRoaringBitmap validDocIdsForOldSegment, String segmentName) {
int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
boolean isConsumingSegmentSeal = !(oldSegment instanceof ImmutableSegment);
- // For partial-upsert table and upsert table with dropOutOfOrder=true &
consistencyMode = NONE, we do not store
- // the previous record location when removing the primary keys not
replaced, it can potentially cause inconsistency
- // between replicas. This can happen when a consuming segment is replaced
by a committed segment that is consumed
- // from a different server with different records (some stream consumer
cannot guarantee consuming the messages in
- // the same order/ when a segment is replaced with lesser consumed rows
from the other server).
- if (isConsumingSegmentSeal && _context.isDropOutOfOrderRecord()
- && _context.getConsistencyMode() == UpsertConfig.ConsistencyMode.NONE)
{
+ // For partial-upsert tables and upsert tables with
dropOutOfOrderRecord=true or outOfOrderRecordColumn configured,
+ // we do not store previous record locations and instead remove all
primary keys that are not replaced. This can
+ // lead to inconsistencies across replicas when a consuming segment is
replaced by a committed segment generated
+ // on a different server with a different set of records. Such scenarios
can occur when stream consumers do not
+ // guarantee the same consumption order, or when a segment with fewer
consumed rows replaces another segment.
+ // To prevent these inconsistencies, we persist the previous record
locations so that we can revert to them and
+ // restore consistency across replicas.
+ if (isConsumingSegmentSeal && (_context.isDropOutOfOrderRecord() ||
_context.getOutOfOrderRecordColumn() != null)) {
_logger.warn("Found {} primary keys not replaced when sealing consuming
segment: {} for upsert table with "
- + "dropOutOfOrderRecord enabled with no consistency mode. This
can potentially cause inconsistency "
+ + "dropOutOfOrderRecord or outOfOrderRecordColumn enabled. This
can potentially cause inconsistency "
+ "between replicas. Reverting back metadata changes and
triggering segment replacement.",
numKeysNotReplaced,
segmentName);
@@ -735,7 +736,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
+ "Proceeding with current state which may cause
inconsistency.", MAX_UPSERT_REVERT_RETRIES,
segmentName,
numKeysStillNotReplaced);
- if (_context.isDropOutOfOrderRecord() && _context.getConsistencyMode()
== UpsertConfig.ConsistencyMode.NONE) {
+ if (_context.isDropOutOfOrderRecord() ||
_context.getOutOfOrderRecordColumn() != null) {
_serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
numKeysStillNotReplaced);
} else if (_partialUpsertHandler != null) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
index 41aace4c040..d576e325979 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
@@ -30,9 +30,12 @@ import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class UpsertContext {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(UpsertContext.class);
private final TableConfig _tableConfig;
private final Schema _schema;
private final List<String> _primaryKeyColumns;
@@ -64,7 +67,6 @@ public class UpsertContext {
@Nullable
private final TableDataManager _tableDataManager;
private final File _tableIndexDir;
-
private UpsertContext(TableConfig tableConfig, Schema schema, List<String>
primaryKeyColumns,
HashFunction hashFunction, List<String> comparisonColumns, @Nullable
PartialUpsertHandler partialUpsertHandler,
@Nullable String deleteRecordColumn, boolean dropOutOfOrderRecord,
@Nullable String outOfOrderRecordColumn,
@@ -223,6 +225,7 @@ public class UpsertContext {
private PartialUpsertHandler _partialUpsertHandler;
private String _deleteRecordColumn;
private boolean _dropOutOfOrderRecord;
+ @Nullable
private String _outOfOrderRecordColumn;
private boolean _enableSnapshot;
private boolean _enablePreload;
@@ -355,6 +358,23 @@ public class UpsertContext {
Preconditions.checkState(_tableDataManager != null, "Either table data
manager or table index dir must be set");
_tableIndexDir = _tableDataManager.getTableDataDir();
}
+ // dropOutOfOrderRecord and outOfOrderRecordColumn are not supported
when consistencyMode is SYNC or SNAPSHOT.
+ // In these modes, records are indexed before metadata is updated, so we
can't drop or mark out-of-order records.
+ // Disable them silently for backward compatibility with existing tables.
+ if (_consistencyMode != UpsertConfig.ConsistencyMode.NONE) {
+ if (_dropOutOfOrderRecord) {
+ _dropOutOfOrderRecord = false;
+ LOGGER.warn(
+ "dropOutOfOrderRecord is not supported when consistencyMode is
set, disabling dropOutOfOrderRecord to get"
+ + "consistent mode: {} for the table: {}", _consistencyMode,
_tableConfig.getTableName());
+ }
+ if (_outOfOrderRecordColumn != null) {
+ _outOfOrderRecordColumn = null;
+ LOGGER.warn(
+ "outOfOrderRecordColumn is not supported when consistencyMode is
set, removing outOfOrderRecordColumn "
+ + "to get consistent mode: {} for the table: {}",
_consistencyMode, _tableConfig.getTableName());
+ }
+ }
return new UpsertContext(_tableConfig, _schema, _primaryKeyColumns,
_hashFunction, _comparisonColumns,
_partialUpsertHandler, _deleteRecordColumn, _dropOutOfOrderRecord,
_outOfOrderRecordColumn, _enableSnapshot,
_enablePreload, _metadataTTL, _deletedKeysTTL,
_enableDeletedKeysCompactionConsistency, _consistencyMode,
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index dc04193737c..f7e89f6660c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -881,6 +881,18 @@ public final class TableConfigUtils {
Preconditions.checkState(upsertConfig.getNewSegmentTrackingTimeMs() >
0,
"Positive newSegmentTrackingTimeMs is required to enable
consistency mode: "
+ upsertConfig.getConsistencyMode());
+ // dropOutOfOrderRecord and outOfOrderRecordColumn are not supported
with SYNC/SNAPSHOT consistency mode
+ // because records must be indexed before metadata is updated. By the
time we know if a record is
+ // out-of-order, it's already indexed - As of today, we can't skip
indexing or update the
+ // out-of-order column value.
+ Preconditions.checkState(!upsertConfig.isDropOutOfOrderRecord(),
+ "dropOutOfOrderRecord cannot be enabled when consistencyMode is
%s. "
+ + "Out-of-order records can only be dropped in NONE
consistency mode.",
+ upsertConfig.getConsistencyMode());
+ Preconditions.checkState(upsertConfig.getOutOfOrderRecordColumn() ==
null,
+ "outOfOrderRecordColumn cannot be configured when consistencyMode
is %s. "
+ + "Out-of-order record marking is only supported in NONE
consistency mode.",
+ upsertConfig.getConsistencyMode());
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index ceb7df1d163..ba2495355d3 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -2501,6 +2501,121 @@ public class TableConfigUtilsTest {
assertEquals(e.getMessage(), "The outOfOrderRecordColumn must be a
single-valued BOOLEAN column");
}
+ // test dropOutOfOrderRecord cannot be enabled with SYNC consistency mode
+ schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+ .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("myPkCol", FieldSpec.DataType.STRING)
+ .build();
+ streamConfigs = getStreamConfigs();
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setDropOutOfOrderRecord(true);
+ upsertConfig.setConsistencyMode(UpsertConfig.ConsistencyMode.SYNC);
+ upsertConfig.setNewSegmentTrackingTimeMs(60000L);
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ fail("Expected IllegalStateException for dropOutOfOrderRecord with SYNC
consistency mode");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("dropOutOfOrderRecord cannot be
enabled when consistencyMode is SYNC"));
+ }
+
+ // test dropOutOfOrderRecord cannot be enabled with SNAPSHOT consistency
mode
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setDropOutOfOrderRecord(true);
+ upsertConfig.setConsistencyMode(UpsertConfig.ConsistencyMode.SNAPSHOT);
+ upsertConfig.setNewSegmentTrackingTimeMs(60000L);
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ fail("Expected IllegalStateException for dropOutOfOrderRecord with
SNAPSHOT consistency mode");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("dropOutOfOrderRecord cannot be
enabled when consistencyMode is SNAPSHOT"));
+ }
+
+ // test dropOutOfOrderRecord is allowed with NONE consistency mode (should
not throw)
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setDropOutOfOrderRecord(true);
+ upsertConfig.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE);
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .build();
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+
+ // test outOfOrderRecordColumn cannot be enabled with SYNC consistency mode
+ schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+ .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("myPkCol", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("isOutOfOrder", FieldSpec.DataType.BOOLEAN)
+ .build();
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setOutOfOrderRecordColumn("isOutOfOrder");
+ upsertConfig.setConsistencyMode(UpsertConfig.ConsistencyMode.SYNC);
+ upsertConfig.setNewSegmentTrackingTimeMs(60000L);
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ fail("Expected IllegalStateException for outOfOrderRecordColumn with
SYNC consistency mode");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("outOfOrderRecordColumn cannot be
configured when consistencyMode is SYNC"));
+ }
+
+ // test outOfOrderRecordColumn cannot be enabled with SNAPSHOT consistency
mode
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setOutOfOrderRecordColumn("isOutOfOrder");
+ upsertConfig.setConsistencyMode(UpsertConfig.ConsistencyMode.SNAPSHOT);
+ upsertConfig.setNewSegmentTrackingTimeMs(60000L);
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ fail("Expected IllegalStateException for outOfOrderRecordColumn with
SNAPSHOT consistency mode");
+ } catch (IllegalStateException e) {
+ assertTrue(
+ e.getMessage().contains("outOfOrderRecordColumn cannot be configured
when consistencyMode is SNAPSHOT"));
+ }
+
+ // test outOfOrderRecordColumn is allowed with NONE consistency mode
(should not throw)
+ schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+ .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("myPkCol", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("isOutOfOrder", FieldSpec.DataType.BOOLEAN)
+ .build();
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setOutOfOrderRecordColumn("isOutOfOrder");
+ upsertConfig.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE);
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .build();
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+
// test enableDeletedKeysCompactionConsistency shouldn't exist with
metadataTTL
upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
upsertConfig.setEnableDeletedKeysCompactionConsistency(true);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]