This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7eae993760 Donot allow consumption for Dedup Tables enabled during
segment download and replacement (#15268)
7eae993760 is described below
commit 7eae993760f99582aec9eca6d4a6bf54f4f19116
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Sat Mar 15 12:22:52 2025 -0400
Donot allow consumption for Dedup Tables enabled during segment download
and replacement (#15268)
* Do not allow consumption for dedup tables and full upsert tables with out
of order entries enabled.
* Add server and table level settings as well for dedup to tune the
consumption during commit
---
.../realtime/RealtimeSegmentDataManager.java | 22 +++++++++++++++++-----
.../dedup/TableDedupMetadataManagerFactory.java | 8 ++++++++
.../apache/pinot/spi/config/table/DedupConfig.java | 11 +++++++++++
3 files changed, 36 insertions(+), 5 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index a57a3cc278..17c425996f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1687,11 +1687,9 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
setConsumeEndTime(segmentZKMetadata, _consumeStartTime);
_segmentCommitterFactory =
new SegmentCommitterFactory(_segmentLogger, _protocolHandler,
tableConfig, indexLoadingConfig, serverMetrics);
- _segmentLogger
- .info("Starting consumption on realtime consuming segment {}
maxRowCount {} maxEndTime {}", llcSegmentName,
- _segmentMaxRowCount, new DateTime(_consumeEndTime,
DateTimeZone.UTC));
- _allowConsumptionDuringCommit =
!_realtimeTableDataManager.isPartialUpsertEnabled() ? true
- :
_tableConfig.getUpsertConfig().isAllowPartialUpsertConsumptionDuringCommit();
+ _segmentLogger.info("Starting consumption on realtime consuming segment
{} maxRowCount {} maxEndTime {}",
+ llcSegmentName, _segmentMaxRowCount, new DateTime(_consumeEndTime,
DateTimeZone.UTC));
+ _allowConsumptionDuringCommit = isConsumptionAllowedDuringCommit();
} catch (Throwable t) {
// In case of exception thrown here, segment goes to ERROR state. Then
any attempt to reset the segment from
// ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the
semaphore is acquired, but not released.
@@ -1725,6 +1723,20 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
}
}
+ // Consumption while downloading and replacing the slow replicas is not
allowed for the following tables:
+ // 1. Partial Upserts
+ // 2. Dedup Tables
+ // For the above table types, we would be looking into the metadata
information when inserting a new record,
+ // so it is not right to allow consumption while downloading and replacing
the consuming segment as we might see
+ // duplicates in dedup tables and inconsistent entries compared to lead
replicas for partial
+ // upsert tables. If tables are dedup/partial upsert enabled check for table
and server config properties to see if
+ // consumption is allowed
+ private boolean isConsumptionAllowedDuringCommit() {
+ return (!_realtimeTableDataManager.isDedupEnabled() ||
_tableConfig.getDedupConfig()
+ .isAllowDedupConsumptionDuringCommit()) &&
(!_realtimeTableDataManager.isPartialUpsertEnabled()
+ ||
_tableConfig.getUpsertConfig().isAllowPartialUpsertConsumptionDuringCommit());
+ }
+
private void setConsumeEndTime(SegmentZKMetadata segmentZKMetadata, long
now) {
long maxConsumeTimeMillis = _streamConfig.getFlushThresholdTimeMillis();
_consumeEndTime = segmentZKMetadata.getCreationTime() +
maxConsumeTimeMillis;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
index a94b4385a5..7d34286335 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
@@ -39,6 +39,9 @@ public class TableDedupMetadataManagerFactory {
public static final String DEDUP_DEFAULT_METADATA_MANAGER_CLASS =
"default.metadata.manager.class";
public static final String DEDUP_DEFAULT_ENABLE_PRELOAD =
"default.enable.preload";
+ public static final String
DEDUP_DEFAULT_ALLOW_DEDUP_CONSUMPTION_DURING_COMMIT =
+ "default.allow.dedup.consumption.during.commit";
+
public static TableDedupMetadataManager create(TableConfig tableConfig,
Schema schema,
TableDataManager tableDataManager, ServerMetrics serverMetrics,
@Nullable PinotConfiguration instanceDedupConfig) {
@@ -59,6 +62,11 @@ public class TableDedupMetadataManagerFactory {
dedupConfig.setEnablePreload(
Boolean.parseBoolean(instanceDedupConfig.getProperty(DEDUP_DEFAULT_ENABLE_PRELOAD,
"false")));
}
+ // server level config honoured only when table level config is not set
to true
+ if (!dedupConfig.isAllowDedupConsumptionDuringCommit()) {
+ dedupConfig.setAllowDedupConsumptionDuringCommit(Boolean.parseBoolean(
+
instanceDedupConfig.getProperty(DEDUP_DEFAULT_ALLOW_DEDUP_CONSUMPTION_DURING_COMMIT,
"false")));
+ }
}
if (StringUtils.isNotEmpty(metadataManagerClass)) {
LOGGER.info("Creating TableDedupMetadataManager with class: {} for
table: {}", metadataManagerClass,
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
index b1e6caec30..b3dc13f420 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
@@ -47,6 +47,9 @@ public class DedupConfig extends BaseJsonConfig {
@JsonPropertyDescription("Whether to preload segments for fast dedup
metadata recovery")
private boolean _enablePreload;
+ @JsonPropertyDescription("Whether to pause dedup table's partition
consumption during commit")
+ private boolean _allowDedupConsumptionDuringCommit;
+
public DedupConfig(@JsonProperty(value = "dedupEnabled", required = true)
boolean dedupEnabled,
@JsonProperty(value = "hashFunction") HashFunction hashFunction) {
this(dedupEnabled, hashFunction, null, null, 0, null, false);
@@ -100,4 +103,12 @@ public class DedupConfig extends BaseJsonConfig {
public void setEnablePreload(boolean enablePreload) {
_enablePreload = enablePreload;
}
+
+ public boolean isAllowDedupConsumptionDuringCommit() {
+ return _allowDedupConsumptionDuringCommit;
+ }
+
+ public void setAllowDedupConsumptionDuringCommit(boolean
allowDedupConsumptionDuringCommit) {
+ _allowDedupConsumptionDuringCommit = allowDedupConsumptionDuringCommit;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]