This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a7c99db6d9 Do not modify cached schema in TableDataManager (#15690)
a7c99db6d9 is described below
commit a7c99db6d9f4773d0a0d409b6ce437c8a52540c8
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri May 2 12:11:11 2025 -0600
Do not modify cached schema in TableDataManager (#15690)
---
.../core/data/manager/BaseTableDataManager.java | 16 +++++--------
.../manager/offline/DimensionTableDataManager.java | 17 ++++++--------
.../manager/realtime/RealtimeTableDataManager.java | 27 +++++++++-------------
.../plan/server/ServerPlanRequestUtils.java | 8 ++++---
.../testutils/MockInstanceDataManagerFactory.java | 5 ++--
.../local/data/manager/TableDataManager.java | 9 +++++---
.../recordtransformer/CompositeTransformer.java | 6 +++++
.../starter/helix/HelixInstanceDataManager.java | 2 ++
8 files changed, 45 insertions(+), 45 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 67088db6ab..1e283ca29b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -146,8 +146,8 @@ public abstract class BaseTableDataManager implements
TableDataManager {
// Cache used for identifying segments which could not be acquired since
they were recently deleted.
protected Cache<String, String> _recentlyDeletedSegments;
- // Caches the latest IndexLoadingConfig. The cached IndexLoadingConfig
should not be modified.
- protected volatile IndexLoadingConfig _indexLoadingConfig;
+ // Caches the latest TableConfig and Schema pair. The cache should not be
modified.
+ protected volatile Pair<TableConfig, Schema> _cachedTableConfigAndSchema;
protected volatile boolean _shutDown;
@@ -190,6 +190,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
.maximumSize(instanceDataManagerConfig.getDeletedSegmentsCacheSize())
.expireAfterWrite(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes(),
TimeUnit.MINUTES)
.build();
+ _cachedTableConfigAndSchema = Pair.of(tableConfig, schema);
_peerDownloadScheme =
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
if (_peerDownloadScheme == null) {
@@ -226,7 +227,6 @@ public abstract class BaseTableDataManager implements
TableDataManager {
_numSegmentsAcquiredDownloadSemaphore = null;
}
_logger = LoggerFactory.getLogger(_tableNameWithType + "-" +
getClass().getSimpleName());
- createAndCacheIndexLoadingConfig(tableConfig, schema);
doInit();
@@ -387,19 +387,15 @@ public abstract class BaseTableDataManager implements
TableDataManager {
Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", _tableNameWithType);
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
Preconditions.checkState(schema != null, "Failed to find schema for table:
%s", _tableNameWithType);
- return createAndCacheIndexLoadingConfig(tableConfig, schema);
- }
-
- private IndexLoadingConfig createAndCacheIndexLoadingConfig(TableConfig
tableConfig, Schema schema) {
IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema);
indexLoadingConfig.setTableDataDir(_tableDataDir);
- _indexLoadingConfig = indexLoadingConfig;
+ _cachedTableConfigAndSchema = Pair.of(tableConfig, schema);
return indexLoadingConfig;
}
@Override
- public IndexLoadingConfig getIndexLoadingConfig() {
- return _indexLoadingConfig;
+ public Pair<TableConfig, Schema> getCachedTableConfigAndSchema() {
+ return _cachedTableConfigAndSchema;
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index 0ac31abc3b..884b2565b6 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -35,10 +35,10 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -110,15 +110,14 @@ public class DimensionTableDataManager extends
OfflineTableDataManager {
protected void doInit() {
super.doInit();
- IndexLoadingConfig indexLoadingConfig = _indexLoadingConfig;
- Schema schema = indexLoadingConfig.getSchema();
- assert schema != null;
+ Pair<TableConfig, Schema> tableConfigAndSchema =
getCachedTableConfigAndSchema();
+ TableConfig tableConfig = tableConfigAndSchema.getLeft();
+ Schema schema = tableConfigAndSchema.getRight();
+
List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
"Primary key columns must be configured for dimension table: %s",
_tableNameWithType);
- TableConfig tableConfig = _indexLoadingConfig.getTableConfig();
- assert tableConfig != null;
DimensionTableConfig dimensionTableConfig =
tableConfig.getDimensionTableConfig();
if (dimensionTableConfig != null) {
_disablePreload = dimensionTableConfig.isDisablePreload();
@@ -207,8 +206,7 @@ public class DimensionTableDataManager extends
OfflineTableDataManager {
// loading is in progress.
int token = _loadToken.incrementAndGet();
- Schema schema = _indexLoadingConfig.getSchema();
- assert schema != null;
+ Schema schema = getCachedTableConfigAndSchema().getRight();
List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
"Primary key columns must be configured for dimension table: %s",
_tableNameWithType);
@@ -283,8 +281,7 @@ public class DimensionTableDataManager extends
OfflineTableDataManager {
// loading is in progress.
int token = _loadToken.incrementAndGet();
- Schema schema = _indexLoadingConfig.getSchema();
- assert schema != null;
+ Schema schema = getCachedTableConfigAndSchema().getRight();
List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
"Primary key columns must be configured for dimension table: %s",
_tableNameWithType);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 375e518f1b..cf7f3e883a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -42,6 +42,7 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
@@ -86,6 +87,7 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
@@ -199,22 +201,17 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// Set up dedup/upsert metadata manager
// NOTE: Dedup/upsert has to be set up when starting the server. Changing
the table config without restarting the
// server won't enable/disable them on the fly.
- IndexLoadingConfig indexLoadingConfig = _indexLoadingConfig;
- TableConfig tableConfig = indexLoadingConfig.getTableConfig();
- assert tableConfig != null;
+ Pair<TableConfig, Schema> tableConfigAndSchema =
getCachedTableConfigAndSchema();
+ TableConfig tableConfig = tableConfigAndSchema.getLeft();
+ Schema schema = tableConfigAndSchema.getRight();
if (tableConfig.isDedupEnabled()) {
- Schema schema = indexLoadingConfig.getSchema();
- assert schema != null;
_tableDedupMetadataManager =
TableDedupMetadataManagerFactory.create(_instanceDataManagerConfig.getDedupConfig(),
tableConfig, schema,
this);
}
-
if (tableConfig.isUpsertEnabled()) {
Preconditions.checkState(_tableDedupMetadataManager == null,
"Dedup and upsert cannot be both enabled for table: %s",
_tableNameWithType);
- Schema schema = indexLoadingConfig.getSchema();
- assert schema != null;
_tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(_instanceDataManagerConfig.getUpsertConfig(),
tableConfig, schema,
this);
@@ -492,7 +489,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
@Override
public void addConsumingSegment(String segmentName)
- throws AttemptsExceededException, RetriableOperationException {
+ throws Exception {
Preconditions.checkState(!_shutDown,
"Table data manager is already shut down, cannot add CONSUMING
segment: %s to table: %s", segmentName,
_tableNameWithType);
@@ -511,7 +508,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
private void doAddConsumingSegment(String segmentName)
- throws AttemptsExceededException, RetriableOperationException {
+ throws Exception {
SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
if (zkMetadata.getStatus().isCompleted()) {
// NOTE:
@@ -542,6 +539,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
TableConfig tableConfig = indexLoadingConfig.getTableConfig();
Schema schema = indexLoadingConfig.getSchema();
assert tableConfig != null && schema != null;
+ // Clone a schema to avoid modifying the cached one
+ schema = JsonUtils.jsonNodeToObject(schema.toJsonObject(), Schema.class);
validate(tableConfig, schema);
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema,
segmentName);
setDefaultTimeValueIfInvalid(tableConfig, schema, zkMetadata);
@@ -587,9 +586,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
String segmentName = zkMetadata.getSegmentName();
Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s
for segment: %s to be downloaded", status,
segmentName);
- TableConfig tableConfig = _indexLoadingConfig.getTableConfig();
- assert tableConfig != null;
- long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig);
+ long downloadTimeoutMs =
getDownloadTimeoutMs(getCachedTableConfigAndSchema().getLeft());
long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs;
while (System.currentTimeMillis() < deadlineMs) {
// ZK Metadata may change during segment download process; fetch it on
every retry.
@@ -864,9 +861,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
@Nullable
public StreamIngestionConfig getStreamIngestionConfig() {
- TableConfig tableConfig = _indexLoadingConfig.getTableConfig();
- assert tableConfig != null;
- IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+ IngestionConfig ingestionConfig =
getCachedTableConfigAndSchema().getLeft().getIngestionConfig();
return ingestionConfig != null ?
ingestionConfig.getStreamIngestionConfig() : null;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index adcae1f30b..8bd40d51c0 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.Expression;
@@ -51,9 +52,10 @@ import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.PlanNodeToOpChain;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -212,8 +214,8 @@ public class ServerPlanRequestUtils {
for (QueryRewriter queryRewriter : QUERY_REWRITERS) {
pinotQuery = queryRewriter.rewrite(pinotQuery);
}
- IndexLoadingConfig indexLoadingConfig =
tableDataManager.getIndexLoadingConfig();
- QUERY_OPTIMIZER.optimize(pinotQuery, indexLoadingConfig.getTableConfig(),
indexLoadingConfig.getSchema());
+ Pair<TableConfig, Schema> tableConfigAndSchema =
tableDataManager.getCachedTableConfigAndSchema();
+ QUERY_OPTIMIZER.optimize(pinotQuery, tableConfigAndSchema.getLeft(),
tableConfigAndSchema.getRight());
// 2. Update query options according to requestMetadataMap
updateQueryOptions(pinotQuery, executionContext);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
index 36cbdb53f6..3e75dfb100 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
@@ -26,13 +26,13 @@ import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -155,8 +155,7 @@ public class MockInstanceDataManagerFactory {
when(tableDataManager.getTableName()).thenReturn(tableNameWithType);
TableConfig tableConfig = createTableConfig(tableNameWithType);
Schema schema =
_schemaMap.get(TableNameBuilder.extractRawTableName(tableNameWithType));
- IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(tableConfig, schema);
-
when(tableDataManager.getIndexLoadingConfig()).thenReturn(indexLoadingConfig);
+
when(tableDataManager.getCachedTableConfigAndSchema()).thenReturn(Pair.of(tableConfig,
schema));
Map<String, SegmentDataManager> segmentDataManagerMap =
segmentList.stream().collect(Collectors.toMap(IndexSegment::getSegmentName,
ImmutableSegmentDataManager::new));
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 727dcf6c92..5e64fd953b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -312,14 +312,17 @@ public interface TableDataManager {
/**
* Fetches the table config and schema for the table from ZK, then construct
the index loading config with them.
+ * The fetched table config and schema are then cached in the table data
manager, and should not be modified. The
+ * cache is used mainly for query time access of table config and schema
without accessing ZK.
*/
IndexLoadingConfig fetchIndexLoadingConfig();
/**
- * Returns the cached latest {@link IndexLoadingConfig} for the table. The
cache is refreshed when invoking
- * {@link #fetchIndexLoadingConfig()}.
+ * Returns the cached latest {@link TableConfig} and {@link Schema} pair for
the table. The cache is refreshed when
+ * invoking {@link #fetchIndexLoadingConfig()}, and should not be modified.
We cache them as a pair to ensure they are
+ * updated at once to avoid race conditions.
*/
- IndexLoadingConfig getIndexLoadingConfig();
+ Pair<TableConfig, Schema> getCachedTableConfigAndSchema();
/**
* Interface to handle segment state transitions from CONSUMING to DROPPED
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
index c639eac2d4..8643dbd43c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig;
@@ -145,6 +146,11 @@ public class CompositeTransformer implements
RecordTransformer {
}
}
+ public static CompositeTransformer getDefaultTransformer(TableConfig
tableConfig, Schema schema, @Nullable
+ SegmentZKMetadata segmentZKMetadata) {
+ return new CompositeTransformer(getDefaultTransformers(tableConfig,
schema));
+ }
+
public static CompositeTransformer getDefaultTransformer(TableConfig
tableConfig, Schema schema) {
return new CompositeTransformer(getDefaultTransformers(tableConfig,
schema));
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index b3ed24fa81..98da1ee05d 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -66,6 +66,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.TimestampIndexUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -297,6 +298,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
}
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
tableNameWithType);
Preconditions.checkState(schema != null, "Failed to find schema for table:
%s", tableNameWithType);
+ TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
TableDataManager tableDataManager =
_tableDataManagerProvider.getTableDataManager(tableConfig, schema,
_segmentReloadSemaphore,
_segmentReloadExecutor, _segmentPreloadExecutor, _errorCache,
_isServerReadyToServeQueries);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]