This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9ee84c9de1 Add a config to skip updating dedup metadata for
non-default tier segments (#15576)
9ee84c9de1 is described below
commit 9ee84c9de13cd32ed6d4f0bfc938f292737c0016
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Apr 16 19:05:40 2025 -0600
Add a config to skip updating dedup metadata for non-default tier segments
(#15576)
---
.../core/data/manager/BaseTableDataManager.java | 10 +++++-----
.../manager/offline/DimensionTableDataManager.java | 5 +++--
.../manager/realtime/RealtimeTableDataManager.java | 11 +++++++----
.../local/data/manager/TableDataManager.java | 18 +++++++++++++++++-
.../local/dedup/BaseTableDedupMetadataManager.java | 4 ++++
.../pinot/segment/local/dedup/DedupContext.java | 21 +++++++++++++++++----
.../apache/pinot/spi/config/table/DedupConfig.java | 13 +++++++++++++
.../org/apache/pinot/spi/utils/CommonConstants.java | 1 +
8 files changed, 67 insertions(+), 16 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index c7773c8b09..67088db6ab 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -324,7 +324,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
* @param immutableSegment Immutable segment to add
*/
@Override
- public void addSegment(ImmutableSegment immutableSegment) {
+ public void addSegment(ImmutableSegment immutableSegment, @Nullable
SegmentZKMetadata zkMetadata) {
String segmentName = immutableSegment.getSegmentName();
Preconditions.checkState(!_shutDown, "Table data manager is already shut
down, cannot add segment: %s to table: %s",
segmentName, _tableNameWithType);
@@ -437,7 +437,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
String segmentName = zkMetadata.getSegmentName();
_logger.info("Downloading and loading segment: {}", segmentName);
File indexDir = downloadSegment(zkMetadata);
- addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
_segmentOperationsThrottler));
+ addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
_segmentOperationsThrottler), zkMetadata);
_logger.info("Downloaded and loaded segment: {} with CRC: {} on tier: {}",
segmentName, zkMetadata.getCrc(),
TierConfigUtils.normalizeTierName(zkMetadata.getTier()));
}
@@ -817,7 +817,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
_logger.info("Reloading segment: {} using existing segment directory
as no reprocessing needed", segmentName);
// No reprocessing needed, reuse the same segment
ImmutableSegment segment =
ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig);
- addSegment(segment);
+ addSegment(segment, zkMetadata);
return;
}
// Create backup directory to handle failure of segment reloading.
@@ -838,7 +838,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
_logger.info("Loading segment: {} from indexDir: {} to tier: {}",
segmentName, indexDir,
TierConfigUtils.normalizeTierName(zkMetadata.getTier()));
ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir,
indexLoadingConfig, _segmentOperationsThrottler);
- addSegment(segment);
+ addSegment(segment, zkMetadata);
// Remove backup directory to mark the completion of segment reloading.
removeBackup(indexDir);
@@ -1217,7 +1217,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
segmentDirectory = initSegmentDirectory(segmentName,
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig);
}
ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory,
indexLoadingConfig);
- addSegment(segment);
+ addSegment(segment, zkMetadata);
_logger.info("Loaded existing segment: {} with CRC: {} on tier: {}",
segmentName, zkMetadata.getCrc(),
TierConfigUtils.normalizeTierName(segmentTier));
return true;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index 442c62567c..0ac31abc3b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -142,8 +143,8 @@ public class DimensionTableDataManager extends
OfflineTableDataManager {
}
@Override
- public void addSegment(ImmutableSegment immutableSegment) {
- super.addSegment(immutableSegment);
+ public void addSegment(ImmutableSegment immutableSegment, @Nullable
SegmentZKMetadata zkMetadata) {
+ super.addSegment(immutableSegment, zkMetadata);
String segmentName = immutableSegment.getSegmentName();
if (loadLookupTable()) {
_logger.info("Successfully loaded lookup table after adding segment:
{}", segmentName);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 7471a5b69f..375e518f1b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -688,19 +688,22 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
@Override
- public void addSegment(ImmutableSegment immutableSegment) {
+ public void addSegment(ImmutableSegment immutableSegment, @Nullable
SegmentZKMetadata zkMetadata) {
String segmentName = immutableSegment.getSegmentName();
Preconditions.checkState(!_shutDown, "Table data manager is already shut
down, cannot add segment: %s to table: %s",
segmentName, _tableNameWithType);
+
if (isUpsertEnabled()) {
handleUpsert(immutableSegment);
return;
}
- if (isDedupEnabled() && immutableSegment instanceof ImmutableSegmentImpl) {
+ if (_tableDedupMetadataManager != null && immutableSegment instanceof
ImmutableSegmentImpl && (zkMetadata == null
+ || zkMetadata.getTier() == null ||
!_tableDedupMetadataManager.getContext().isIgnoreNonDefaultTiers())) {
handleDedup((ImmutableSegmentImpl) immutableSegment);
}
- super.addSegment(immutableSegment);
+
+ super.addSegment(immutableSegment, zkMetadata);
}
private void handleDedup(ImmutableSegmentImpl immutableSegment) {
@@ -821,7 +824,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// Get a new index loading config with latest table config and schema to
load the segment
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
- addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
_segmentOperationsThrottler));
+ addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
_segmentOperationsThrottler), zkMetadata);
_logger.info("Downloaded and replaced CONSUMING segment: {}", segmentName);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index e8351e5cbe..727dcf6c92 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -94,10 +94,26 @@ public interface TableDataManager {
/**
* Adds a loaded immutable segment into the table.
+ * See {@link #addSegment(ImmutableSegment, SegmentZKMetadata)} for details.
+ */
+ @VisibleForTesting
+ default void addSegment(ImmutableSegment immutableSegment) {
+ addSegment(immutableSegment, null);
+ }
+
+ /**
+ * Adds a loaded immutable segment into the table.
+ * <p>If one segment already exists with the same name, replaces it with the
new one.
+ * <p>Ensures that reference count of the old segment (if replaced) is
reduced by 1, so that the last user of the old
+ * segment (or the calling thread, if there are none) remove the segment.
+ * <p>The new segment is added with reference count of 1, so that is never
removed until a drop command comes through.
+ * <p>Segment ZK metadata might not be available when replacing a CONSUMING
segment with the locally sealed one or
+ * invoked from tests.
+ *
* NOTE: This method is not designed to be directly used by the production
code, but can be handy to set up tests.
*/
@VisibleForTesting
- void addSegment(ImmutableSegment immutableSegment);
+ void addSegment(ImmutableSegment immutableSegment, @Nullable
SegmentZKMetadata zkMetadata);
/**
* Adds an ONLINE segment into a table.
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
index f2487aa715..e2c8f986e2 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
@@ -75,6 +75,9 @@ public abstract class BaseTableDedupMetadataManager
implements TableDedupMetadat
}
}
+ boolean ignoreNonDefaultTiers = dedupConfig.getIgnoreNonDefaultTiers()
+ .isEnabled(() ->
instanceDedupConfig.getProperty(Dedup.DEFAULT_IGNORE_NON_DEFAULT_TIERS, false));
+
// NOTE: This field doesn't follow enablement override, and always enabled
if enabled at instance level.
boolean allowDedupConsumptionDuringCommit =
dedupConfig.isAllowDedupConsumptionDuringCommit();
if (!allowDedupConsumptionDuringCommit) {
@@ -91,6 +94,7 @@ public abstract class BaseTableDedupMetadataManager
implements TableDedupMetadat
.setMetadataTTL(metadataTTL)
.setDedupTimeColumn(dedupTimeColumn)
.setEnablePreload(enablePreload)
+ .setIgnoreNonDefaultTiers(ignoreNonDefaultTiers)
.setMetadataManagerConfigs(dedupConfig.getMetadataManagerConfigs())
.setAllowDedupConsumptionDuringCommit(allowDedupConsumptionDuringCommit)
.build();
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
index fb0ba48c6a..687fe9559d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
@@ -40,6 +40,7 @@ public class DedupContext {
@Nullable
private final String _dedupTimeColumn;
private final boolean _enablePreload;
+ private final boolean _ignoreNonDefaultTiers;
@Nullable
private final Map<String, String> _metadataManagerConfigs;
@@ -54,8 +55,8 @@ public class DedupContext {
private DedupContext(TableConfig tableConfig, Schema schema, List<String>
primaryKeyColumns,
HashFunction hashFunction, double metadataTTL, @Nullable String
dedupTimeColumn, boolean enablePreload,
- @Nullable Map<String, String> metadataManagerConfigs, boolean
allowDedupConsumptionDuringCommit,
- @Nullable TableDataManager tableDataManager, File tableIndexDir) {
+ boolean ignoreNonDefaultTiers, @Nullable Map<String, String>
metadataManagerConfigs,
+ boolean allowDedupConsumptionDuringCommit, @Nullable TableDataManager
tableDataManager, File tableIndexDir) {
_tableConfig = tableConfig;
_schema = schema;
_primaryKeyColumns = primaryKeyColumns;
@@ -63,6 +64,7 @@ public class DedupContext {
_metadataTTL = metadataTTL;
_dedupTimeColumn = dedupTimeColumn;
_enablePreload = enablePreload;
+ _ignoreNonDefaultTiers = ignoreNonDefaultTiers;
_metadataManagerConfigs = metadataManagerConfigs;
_allowDedupConsumptionDuringCommit = allowDedupConsumptionDuringCommit;
_tableDataManager = tableDataManager;
@@ -98,6 +100,10 @@ public class DedupContext {
return _enablePreload;
}
+ public boolean isIgnoreNonDefaultTiers() {
+ return _ignoreNonDefaultTiers;
+ }
+
@Nullable
public Map<String, String> getMetadataManagerConfigs() {
return _metadataManagerConfigs;
@@ -125,6 +131,7 @@ public class DedupContext {
.append("metadataTTL", _metadataTTL)
.append("dedupTimeColumn", _dedupTimeColumn)
.append("enablePreload", _enablePreload)
+ .append("ignoreNonDefaultTiers", _ignoreNonDefaultTiers)
.append("metadataManagerConfigs", _metadataManagerConfigs)
.append("allowDedupConsumptionDuringCommit",
_allowDedupConsumptionDuringCommit)
.append("tableIndexDir", _tableIndexDir)
@@ -139,6 +146,7 @@ public class DedupContext {
private double _metadataTTL;
private String _dedupTimeColumn;
private boolean _enablePreload;
+ private boolean _ignoreNonDefaultTiers;
private Map<String, String> _metadataManagerConfigs;
@Deprecated
private boolean _allowDedupConsumptionDuringCommit;
@@ -180,6 +188,11 @@ public class DedupContext {
return this;
}
+ public Builder setIgnoreNonDefaultTiers(boolean ignoreNonDefaultTiers) {
+ _ignoreNonDefaultTiers = ignoreNonDefaultTiers;
+ return this;
+ }
+
public Builder setMetadataManagerConfigs(Map<String, String>
metadataManagerConfigs) {
_metadataManagerConfigs = metadataManagerConfigs;
return this;
@@ -211,8 +224,8 @@ public class DedupContext {
_tableIndexDir = _tableDataManager.getTableDataDir();
}
return new DedupContext(_tableConfig, _schema, _primaryKeyColumns,
_hashFunction, _metadataTTL, _dedupTimeColumn,
- _enablePreload, _metadataManagerConfigs,
_allowDedupConsumptionDuringCommit, _tableDataManager,
- _tableIndexDir);
+ _enablePreload, _ignoreNonDefaultTiers, _metadataManagerConfigs,
_allowDedupConsumptionDuringCommit,
+ _tableDataManager, _tableIndexDir);
}
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
index d46cc998bf..3eb03fece3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
@@ -49,6 +49,9 @@ public class DedupConfig extends BaseJsonConfig {
+ "ENABLE, DISABLE and DEFAULT (use instance level default behavior).")
private Enablement _preload = Enablement.DEFAULT;
+ @JsonPropertyDescription("Whether to ignore segments from non-default tiers
when constructing dedup metadata.")
+ private Enablement _ignoreNonDefaultTiers = Enablement.DEFAULT;
+
@JsonPropertyDescription("Custom class for dedup metadata manager. If not
specified, the default implementation "
+ "ConcurrentMapTableDedupMetadataManager will be used.")
@Nullable
@@ -133,6 +136,16 @@ public class DedupConfig extends BaseJsonConfig {
_preload = preload;
}
+ public Enablement getIgnoreNonDefaultTiers() {
+ return _ignoreNonDefaultTiers;
+ }
+
+ public void setIgnoreNonDefaultTiers(Enablement ignoreNonDefaultTiers) {
+ Preconditions.checkArgument(ignoreNonDefaultTiers != null,
+ "Ignore non-default tiers cannot be null, must be one of ENABLE,
DISABLE or DEFAULT");
+ _ignoreNonDefaultTiers = ignoreNonDefaultTiers;
+ }
+
@Nullable
public String getMetadataManagerClass() {
return _metadataManagerClass;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7f975d93c7..1bcbc1126e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1115,6 +1115,7 @@ public class CommonConstants {
public static final String CONFIG_PREFIX = "dedup";
public static final String DEFAULT_METADATA_MANAGER_CLASS =
"default.metadata.manager.class";
public static final String DEFAULT_ENABLE_PRELOAD =
"default.enable.preload";
+ public static final String DEFAULT_IGNORE_NON_DEFAULT_TIERS =
"default.ignore.non.default.tiers";
/// @deprecated use {@link
org.apache.pinot.spi.config.table.ingestion.ParallelSegmentConsumptionPolicy)}
instead.
@Deprecated
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]