This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 667fba096f Cache IndexLoadingConfig in TableDataManager (#15443)
667fba096f is described below
commit 667fba096f0e0c721df1c1a9943ad57a3bacef3f
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Apr 4 12:20:56 2025 -0600
Cache IndexLoadingConfig in TableDataManager (#15443)
---
.../core/data/manager/BaseTableDataManager.java | 46 ++--
.../manager/offline/DimensionTableDataManager.java | 28 +--
.../provider/DefaultTableDataManagerProvider.java | 10 +-
.../manager/provider/TableDataManagerProvider.java | 12 +-
.../manager/realtime/RealtimeTableDataManager.java | 31 +--
.../BaseTableDataManagerAcquireSegmentTest.java | 10 +-
.../BaseTableDataManagerNeedRefreshTest.java | 275 ++++++++++++---------
.../data/manager/BaseTableDataManagerTest.java | 3 +-
.../offline/DimensionTableDataManagerTest.java | 12 +-
.../realtime/RealtimeSegmentDataManagerTest.java | 16 +-
.../executor/QueryExecutorExceptionsTest.java | 2 +-
.../core/query/executor/QueryExecutorTest.java | 2 +-
.../pinot/queries/ExplainPlanQueriesTest.java | 2 +-
.../queries/SegmentWithNullValueVectorTest.java | 2 +-
.../FailureInjectingTableDataManagerProvider.java | 11 +-
.../perf/BenchmarkDimensionTableOverhead.java | 2 +-
.../local/data/manager/TableDataManager.java | 24 +-
.../pinot/server/api/resources/TablesResource.java | 5 +-
.../starter/helix/HelixInstanceDataManager.java | 7 +-
.../apache/pinot/server/api/BaseResourceTest.java | 25 +-
20 files changed, 286 insertions(+), 239 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index c506dc336c..c7773c8b09 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -119,7 +119,6 @@ public abstract class BaseTableDataManager implements
TableDataManager {
protected HelixManager _helixManager;
protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
protected SegmentLocks _segmentLocks;
- protected TableConfig _tableConfig;
protected String _tableNameWithType;
protected String _tableDataDir;
protected File _indexDir;
@@ -136,9 +135,10 @@ public abstract class BaseTableDataManager implements
TableDataManager {
protected boolean _isStreamSegmentDownloadUntar;
@Nullable
protected SegmentOperationsThrottler _segmentOperationsThrottler;
+
// Semaphore to restrict the maximum number of parallel segment downloads
from deep store for a table
- private Semaphore _segmentDownloadSemaphore;
- private AtomicInteger _numSegmentsAcquiredDownloadSemaphore;
+ protected Semaphore _segmentDownloadSemaphore;
+ protected AtomicInteger _numSegmentsAcquiredDownloadSemaphore;
// Fixed size LRU cache with TableName - SegmentName pair as key, and
segment related errors as the value.
@Nullable
@@ -146,11 +146,14 @@ public abstract class BaseTableDataManager implements
TableDataManager {
// Cache used for identifying segments which could not be acquired since
they were recently deleted.
protected Cache<String, String> _recentlyDeletedSegments;
+ // Caches the latest IndexLoadingConfig. The cached IndexLoadingConfig
should not be modified.
+ protected volatile IndexLoadingConfig _indexLoadingConfig;
+
protected volatile boolean _shutDown;
@Override
public void init(InstanceDataManagerConfig instanceDataManagerConfig,
HelixManager helixManager,
- SegmentLocks segmentLocks, TableConfig tableConfig,
SegmentReloadSemaphore segmentReloadSemaphore,
+ SegmentLocks segmentLocks, TableConfig tableConfig, Schema schema,
SegmentReloadSemaphore segmentReloadSemaphore,
ExecutorService segmentReloadExecutor, @Nullable ExecutorService
segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
@Nullable SegmentOperationsThrottler segmentOperationsThrottler) {
@@ -161,14 +164,13 @@ public abstract class BaseTableDataManager implements
TableDataManager {
_helixManager = helixManager;
_propertyStore = helixManager.getHelixPropertyStore();
_segmentLocks = segmentLocks;
- _tableConfig = tableConfig;
_segmentReloadSemaphore = segmentReloadSemaphore;
_segmentReloadExecutor = segmentReloadExecutor;
_segmentPreloadExecutor = segmentPreloadExecutor;
- _authProvider =
AuthProviderUtils.extractAuthProvider(_instanceDataManagerConfig.getAuthConfig(),
null);
+ _authProvider =
AuthProviderUtils.extractAuthProvider(instanceDataManagerConfig.getAuthConfig(),
null);
_tableNameWithType = tableConfig.getTableName();
- _tableDataDir = _instanceDataManagerConfig.getInstanceDataDir() +
File.separator + _tableNameWithType;
+ _tableDataDir = instanceDataManagerConfig.getInstanceDataDir() +
File.separator + _tableNameWithType;
_indexDir = new File(_tableDataDir);
if (!_indexDir.exists()) {
Preconditions.checkState(_indexDir.mkdirs(), "Unable to create index
directory at %s. "
@@ -224,6 +226,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
_numSegmentsAcquiredDownloadSemaphore = null;
}
_logger = LoggerFactory.getLogger(_tableNameWithType + "-" +
getClass().getSimpleName());
+ createAndCacheIndexLoadingConfig(tableConfig, schema);
doInit();
@@ -379,21 +382,26 @@ public abstract class BaseTableDataManager implements
TableDataManager {
}
@Override
- public Pair<TableConfig, Schema> fetchTableConfigAndSchema() {
+ public IndexLoadingConfig fetchIndexLoadingConfig() {
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", _tableNameWithType);
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
Preconditions.checkState(schema != null, "Failed to find schema for table:
%s", _tableNameWithType);
- return Pair.of(tableConfig, schema);
+ return createAndCacheIndexLoadingConfig(tableConfig, schema);
}
- @Override
- public IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig,
Schema schema) {
+ private IndexLoadingConfig createAndCacheIndexLoadingConfig(TableConfig
tableConfig, Schema schema) {
IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema);
indexLoadingConfig.setTableDataDir(_tableDataDir);
+ _indexLoadingConfig = indexLoadingConfig;
return indexLoadingConfig;
}
+ @Override
+ public IndexLoadingConfig getIndexLoadingConfig() {
+ return _indexLoadingConfig;
+ }
+
@Override
public void addNewOnlineSegment(SegmentZKMetadata zkMetadata,
IndexLoadingConfig indexLoadingConfig)
throws Exception {
@@ -1258,13 +1266,14 @@ public abstract class BaseTableDataManager implements
TableDataManager {
}
@Override
- public List<StaleSegment> getStaleSegments(TableConfig tableConfig, Schema
schema) {
+ public List<StaleSegment> getStaleSegments() {
+ IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
List<StaleSegment> staleSegments = new ArrayList<>();
List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
- final long startTime = System.currentTimeMillis();
+ long startTimeMs = System.currentTimeMillis();
try {
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
- StaleSegment response = isSegmentStale(tableConfig, schema,
segmentDataManager);
+ StaleSegment response = isSegmentStale(indexLoadingConfig,
segmentDataManager);
if (response.isStale()) {
staleSegments.add(response);
}
@@ -1273,13 +1282,18 @@ public abstract class BaseTableDataManager implements
TableDataManager {
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
releaseSegment(segmentDataManager);
}
- LOGGER.info("Time Taken to get stale segments: {} ms",
System.currentTimeMillis() - startTime);
+ LOGGER.info("Time Taken to get stale segments: {} ms",
System.currentTimeMillis() - startTimeMs);
}
return staleSegments;
}
- protected StaleSegment isSegmentStale(TableConfig tableConfig, Schema
schema, SegmentDataManager segmentDataManager) {
+ @VisibleForTesting
+ StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig,
SegmentDataManager segmentDataManager) {
+ TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+ Schema schema = indexLoadingConfig.getSchema();
+ assert tableConfig != null && schema != null;
+
String tableNameWithType = tableConfig.getTableName();
Map<String, FieldIndexConfigs> indexConfigsMap =
FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index 5e36d847e4..442c62567c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -35,9 +35,9 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -108,20 +108,20 @@ public class DimensionTableDataManager extends
OfflineTableDataManager {
@Override
protected void doInit() {
super.doInit();
- Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
- Preconditions.checkState(schema != null, "Failed to find schema for
dimension table: %s", _tableNameWithType);
+ IndexLoadingConfig indexLoadingConfig = _indexLoadingConfig;
+ Schema schema = indexLoadingConfig.getSchema();
+ assert schema != null;
List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
"Primary key columns must be configured for dimension table: %s",
_tableNameWithType);
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
- if (tableConfig != null) {
- DimensionTableConfig dimensionTableConfig =
tableConfig.getDimensionTableConfig();
- if (dimensionTableConfig != null) {
- _disablePreload = dimensionTableConfig.isDisablePreload();
- _errorOnDuplicatePrimaryKey =
dimensionTableConfig.isErrorOnDuplicatePrimaryKey();
- }
+ TableConfig tableConfig = _indexLoadingConfig.getTableConfig();
+ assert tableConfig != null;
+ DimensionTableConfig dimensionTableConfig =
tableConfig.getDimensionTableConfig();
+ if (dimensionTableConfig != null) {
+ _disablePreload = dimensionTableConfig.isDisablePreload();
+ _errorOnDuplicatePrimaryKey =
dimensionTableConfig.isErrorOnDuplicatePrimaryKey();
}
if (_disablePreload) {
@@ -206,8 +206,8 @@ public class DimensionTableDataManager extends
OfflineTableDataManager {
// loading is in progress.
int token = _loadToken.incrementAndGet();
- Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
- Preconditions.checkState(schema != null, "Failed to find schema for
dimension table: %s", _tableNameWithType);
+ Schema schema = _indexLoadingConfig.getSchema();
+ assert schema != null;
List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
"Primary key columns must be configured for dimension table: %s",
_tableNameWithType);
@@ -282,8 +282,8 @@ public class DimensionTableDataManager extends
OfflineTableDataManager {
// loading is in progress.
int token = _loadToken.incrementAndGet();
- Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
- Preconditions.checkState(schema != null, "Failed to find schema for
dimension table: %s", _tableNameWithType);
+ Schema schema = _indexLoadingConfig.getSchema();
+ assert schema != null;
List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
"Primary key columns must be configured for dimension table: %s",
_tableNameWithType);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
index 186736002d..b750016f74 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
@@ -37,6 +37,7 @@ import
org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -64,8 +65,9 @@ public class DefaultTableDataManagerProvider implements
TableDataManagerProvider
}
@Override
- public TableDataManager getTableDataManager(TableConfig tableConfig,
SegmentReloadSemaphore segmentReloadSemaphore,
- ExecutorService segmentReloadExecutor, @Nullable ExecutorService
segmentPreloadExecutor,
+ public TableDataManager getTableDataManager(TableConfig tableConfig, Schema
schema,
+ SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService
segmentReloadExecutor,
+ @Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
Supplier<Boolean> isServerReadyToServeQueries) {
TableDataManager tableDataManager;
@@ -90,8 +92,8 @@ public class DefaultTableDataManagerProvider implements
TableDataManagerProvider
default:
throw new IllegalStateException();
}
- tableDataManager.init(_instanceDataManagerConfig, _helixManager,
_segmentLocks, tableConfig, segmentReloadSemaphore,
- segmentReloadExecutor, segmentPreloadExecutor, errorCache,
_segmentOperationsThrottler);
+ tableDataManager.init(_instanceDataManagerConfig, _helixManager,
_segmentLocks, tableConfig, schema,
+ segmentReloadSemaphore, segmentReloadExecutor, segmentPreloadExecutor,
errorCache, _segmentOperationsThrottler);
return tableDataManager;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
index 59e5a75f8c..63bfed0b0e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
@@ -34,6 +34,7 @@ import
org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
/**
@@ -45,14 +46,15 @@ public interface TableDataManagerProvider {
void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager
helixManager, SegmentLocks segmentLocks,
@Nullable SegmentOperationsThrottler segmentOperationsThrottler);
- TableDataManager getTableDataManager(TableConfig tableConfig,
SegmentReloadSemaphore segmentRefreshSemaphore,
- ExecutorService segmentRefreshExecutor, @Nullable ExecutorService
segmentPreloadExecutor,
+ TableDataManager getTableDataManager(TableConfig tableConfig, Schema schema,
+ SegmentReloadSemaphore segmentRefreshSemaphore, ExecutorService
segmentRefreshExecutor,
+ @Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
Supplier<Boolean> isServerReadyToServeQueries);
@VisibleForTesting
- default TableDataManager getTableDataManager(TableConfig tableConfig) {
- return getTableDataManager(tableConfig, new SegmentReloadSemaphore(1),
Executors.newSingleThreadExecutor(), null,
- null, () -> true);
+ default TableDataManager getTableDataManager(TableConfig tableConfig, Schema
schema) {
+ return getTableDataManager(tableConfig, schema, new
SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
+ null, null, () -> true);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 5d212530b2..0db35b57a4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -43,7 +43,6 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.Utils;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
@@ -201,28 +200,30 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// Set up dedup/upsert metadata manager
// NOTE: Dedup/upsert has to be set up when starting the server. Changing
the table config without restarting the
// server won't enable/disable them on the fly.
- DedupConfig dedupConfig = _tableConfig.getDedupConfig();
+ IndexLoadingConfig indexLoadingConfig = _indexLoadingConfig;
+ TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+ assert tableConfig != null;
+ DedupConfig dedupConfig = tableConfig.getDedupConfig();
boolean dedupEnabled = dedupConfig != null && dedupConfig.isDedupEnabled();
if (dedupEnabled) {
- Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
- Preconditions.checkState(schema != null, "Failed to find schema for
table: %s", _tableNameWithType);
-
+ Schema schema = indexLoadingConfig.getSchema();
+ assert schema != null;
List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
Preconditions.checkState(!CollectionUtils.isEmpty(primaryKeyColumns),
"Primary key columns must be configured for dedup");
- _tableDedupMetadataManager =
TableDedupMetadataManagerFactory.create(_tableConfig, schema, this,
_serverMetrics,
+ _tableDedupMetadataManager =
TableDedupMetadataManagerFactory.create(tableConfig, schema, this,
_serverMetrics,
_instanceDataManagerConfig.getDedupConfig());
}
- UpsertConfig upsertConfig = _tableConfig.getUpsertConfig();
+ UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
if (upsertConfig != null && upsertConfig.getMode() !=
UpsertConfig.Mode.NONE) {
Preconditions.checkState(!dedupEnabled, "Dedup and upsert cannot be both
enabled for table: %s",
_tableUpsertMetadataManager);
- Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
- Preconditions.checkState(schema != null, "Failed to find schema for
table: %s", _tableNameWithType);
+ Schema schema = indexLoadingConfig.getSchema();
+ assert schema != null;
_tableUpsertMetadataManager =
- TableUpsertMetadataManagerFactory.create(_tableConfig,
_instanceDataManagerConfig.getUpsertConfig());
- _tableUpsertMetadataManager.init(_tableConfig, schema, this);
+ TableUpsertMetadataManagerFactory.create(tableConfig,
_instanceDataManagerConfig.getUpsertConfig());
+ _tableUpsertMetadataManager.init(tableConfig, schema, this);
}
_enforceConsumptionInOrder = isEnforceConsumptionInOrder();
@@ -594,8 +595,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
String segmentName = zkMetadata.getSegmentName();
Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s
for segment: %s to be downloaded", status,
segmentName);
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
- Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", _tableNameWithType);
+ TableConfig tableConfig = _indexLoadingConfig.getTableConfig();
+ assert tableConfig != null;
long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig);
long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs;
while (System.currentTimeMillis() < deadlineMs) {
@@ -868,7 +869,9 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
@Nullable
public StreamIngestionConfig getStreamIngestionConfig() {
- IngestionConfig ingestionConfig = _tableConfig.getIngestionConfig();
+ TableConfig tableConfig = _indexLoadingConfig.getTableConfig();
+ assert tableConfig != null;
+ IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
return ingestionConfig != null ?
ingestionConfig.getStreamIngestionConfig() : null;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index ad43c7c299..c861c13fc4 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -47,6 +47,7 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
@@ -123,11 +124,12 @@ public class BaseTableDataManagerAcquireSegmentTest {
when(instanceDataManagerConfig.getDeletedSegmentsCacheSize()).thenReturn(DELETED_SEGMENTS_CACHE_SIZE);
when(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes()).thenReturn(DELETED_SEGMENTS_TTL_MINUTES);
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
- SegmentOperationsThrottler segmentOperationsThrottler = new
SegmentOperationsThrottler(
- new SegmentAllIndexPreprocessThrottler(8, 10, true), new
SegmentStarTreePreprocessThrottler(4, 8, true),
- new SegmentDownloadThrottler(10, 20, true));
+ Schema schema = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).build();
+ SegmentOperationsThrottler segmentOperationsThrottler =
+ new SegmentOperationsThrottler(new
SegmentAllIndexPreprocessThrottler(8, 10, true),
+ new SegmentStarTreePreprocessThrottler(4, 8, true), new
SegmentDownloadThrottler(10, 20, true));
TableDataManager tableDataManager = new OfflineTableDataManager();
- tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class),
new SegmentLocks(), tableConfig,
+ tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class),
new SegmentLocks(), tableConfig, schema,
new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
null, null, segmentOperationsThrottler);
tableDataManager.start();
Field segsMapField =
BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
index 5c4d4d347b..451d5ed9cf 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
@@ -84,6 +84,7 @@ public class BaseTableDataManagerNeedRefreshTest {
private static final TableConfig TABLE_CONFIG;
private static final Schema SCHEMA;
+ private static final IndexLoadingConfig INDEX_LOADING_CONFIG;
private static final ImmutableSegmentDataManager
IMMUTABLE_SEGMENT_DATA_MANAGER;
private static final BaseTableDataManager BASE_TABLE_DATA_MANAGER;
@@ -93,8 +94,9 @@ public class BaseTableDataManagerNeedRefreshTest {
try {
TABLE_CONFIG = getTableConfigBuilder().build();
SCHEMA = getSchema();
+ INDEX_LOADING_CONFIG = new IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
IMMUTABLE_SEGMENT_DATA_MANAGER =
- createImmutableSegmentDataManager(TABLE_CONFIG, SCHEMA,
"basicSegment", generateRows());
+ createImmutableSegmentDataManager(INDEX_LOADING_CONFIG,
"basicSegment", generateRows());
BASE_TABLE_DATA_MANAGER = BaseTableDataManagerTest.createTableManager();
} catch (Exception e) {
throw new RuntimeException(e);
@@ -103,7 +105,8 @@ public class BaseTableDataManagerNeedRefreshTest {
protected static TableConfigBuilder getTableConfigBuilder() {
return new
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME)
-
.setTimeColumnName(DEFAULT_TIME_COLUMN_NAME).setNullHandlingEnabled(true)
+ .setTimeColumnName(DEFAULT_TIME_COLUMN_NAME)
+ .setNullHandlingEnabled(true)
.setNoDictionaryColumns(List.of(TEXT_INDEX_COLUMN));
}
@@ -119,7 +122,8 @@ public class BaseTableDataManagerNeedRefreshTest {
.addSingleValueDimension(FST_TEST_COLUMN, FieldSpec.DataType.STRING)
.addSingleValueDimension(NULL_VALUE_COLUMN, FieldSpec.DataType.STRING)
.addSingleValueDimension(DISTANCE_COLUMN_NAME, FieldSpec.DataType.INT)
- .addSingleValueDimension(CARRIER_COLUMN_NAME,
FieldSpec.DataType.STRING).build();
+ .addSingleValueDimension(CARRIER_COLUMN_NAME,
FieldSpec.DataType.STRING)
+ .build();
}
protected static List<GenericRow> generateRows() {
@@ -159,10 +163,10 @@ public class BaseTableDataManagerNeedRefreshTest {
return List.of(row0, row2, row1);
}
- private static File createSegment(TableConfig tableConfig, Schema schema,
- String segmentName, List<GenericRow> rows)
+ private static File createSegment(IndexLoadingConfig indexLoadingConfig,
String segmentName, List<GenericRow> rows)
throws Exception {
- SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig,
schema);
+ SegmentGeneratorConfig config =
+ new SegmentGeneratorConfig(indexLoadingConfig.getTableConfig(),
indexLoadingConfig.getSchema());
config.setOutDir(TABLE_DATA_DIR.getAbsolutePath());
config.setSegmentName(segmentName);
config.setSegmentVersion(SegmentVersion.v3);
@@ -174,14 +178,12 @@ public class BaseTableDataManagerNeedRefreshTest {
return new File(TABLE_DATA_DIR, segmentName);
}
- private static ImmutableSegmentDataManager
createImmutableSegmentDataManager(TableConfig tableConfig, Schema schema,
+ private static ImmutableSegmentDataManager
createImmutableSegmentDataManager(IndexLoadingConfig indexLoadingConfig,
String segmentName, List<GenericRow> rows)
throws Exception {
ImmutableSegmentDataManager segmentDataManager =
mock(ImmutableSegmentDataManager.class);
when(segmentDataManager.getSegmentName()).thenReturn(segmentName);
- File indexDir = createSegment(tableConfig, schema, segmentName, rows);
-
- IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(tableConfig, schema);
+ File indexDir = createSegment(indexLoadingConfig, segmentName, rows);
ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir,
indexLoadingConfig,
BaseTableDataManagerTest.SEGMENT_OPERATIONS_THROTTLER);
when(segmentDataManager.getSegment()).thenReturn(immutableSegment);
@@ -196,13 +198,15 @@ public class BaseTableDataManagerNeedRefreshTest {
@Test
void testAddTimeColumn()
throws Exception {
- TableConfig tableConfig =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME).setNullHandlingEnabled(true)
-
.setNoDictionaryColumns(Collections.singletonList(TEXT_INDEX_COLUMN)).build();
-
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME)
+ .setNullHandlingEnabled(true)
+ .setNoDictionaryColumns(Collections.singletonList(TEXT_INDEX_COLUMN))
+ .build();
Schema schema = new
Schema.SchemaBuilder().addSingleValueDimension(TEXT_INDEX_COLUMN,
FieldSpec.DataType.STRING)
.addSingleValueDimension(JSON_INDEX_COLUMN, FieldSpec.DataType.JSON)
- .addSingleValueDimension(FST_TEST_COLUMN,
FieldSpec.DataType.STRING).build();
+ .addSingleValueDimension(FST_TEST_COLUMN, FieldSpec.DataType.STRING)
+ .build();
+ IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(tableConfig, schema);
GenericRow row = new GenericRow();
row.putValue(TEXT_INDEX_COLUMN, "text_index_column");
@@ -210,23 +214,22 @@ public class BaseTableDataManagerNeedRefreshTest {
row.putValue(FST_TEST_COLUMN, "fst_test_column");
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, schema, "noChanges",
List.of(row));
+ createImmutableSegmentDataManager(indexLoadingConfig, "noChanges",
List.of(row));
BaseTableDataManager tableDataManager =
BaseTableDataManagerTest.createTableManager();
- StaleSegment response =
- tableDataManager.isSegmentStale(tableConfig, schema,
segmentDataManager);
+ StaleSegment response =
tableDataManager.isSegmentStale(indexLoadingConfig, segmentDataManager);
assertFalse(response.isStale());
// Test new time column
- response =
tableDataManager.isSegmentStale(getTableConfigBuilder().build(), getSchema(),
segmentDataManager);
+ response = tableDataManager.isSegmentStale(INDEX_LOADING_CONFIG,
segmentDataManager);
assertTrue(response.isStale());
assertEquals(response.getReason(), "time column");
}
@Test
void testChangeTimeColumn() {
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(
-
getTableConfigBuilder().setTimeColumnName(MS_SINCE_EPOCH_COLUMN_NAME).build(),
SCHEMA,
+ TableConfig tableConfig =
getTableConfigBuilder().setTimeColumnName(MS_SINCE_EPOCH_COLUMN_NAME).build();
+ StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(tableConfig, SCHEMA),
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "time column");
@@ -237,8 +240,8 @@ public class BaseTableDataManagerNeedRefreshTest {
throws Exception {
Schema schema = getSchema();
schema.removeField(TEXT_INDEX_COLUMN);
- StaleSegment response =
- BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(TABLE_CONFIG, schema),
+ IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "column deleted: textColumn");
}
@@ -249,9 +252,8 @@ public class BaseTableDataManagerNeedRefreshTest {
Schema schema = getSchema();
schema.removeField(TEXT_INDEX_COLUMN);
schema.addField(new MetricFieldSpec(TEXT_INDEX_COLUMN,
FieldSpec.DataType.STRING, true));
-
- StaleSegment response =
- BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(TABLE_CONFIG, schema),
+ IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "field type changed: textColumn");
}
@@ -262,9 +264,8 @@ public class BaseTableDataManagerNeedRefreshTest {
Schema schema = getSchema();
schema.removeField(TEXT_INDEX_COLUMN);
schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN,
FieldSpec.DataType.INT, true));
-
- StaleSegment response =
- BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(TABLE_CONFIG, schema),
+ IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "data type changed: textColumn");
}
@@ -275,9 +276,8 @@ public class BaseTableDataManagerNeedRefreshTest {
Schema schema = getSchema();
schema.removeField(TEXT_INDEX_COLUMN);
schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN,
FieldSpec.DataType.STRING, false));
-
- StaleSegment response =
- BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(TABLE_CONFIG, schema),
+ IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "single / multi value changed:
textColumn");
}
@@ -288,9 +288,8 @@ public class BaseTableDataManagerNeedRefreshTest {
Schema schema = getSchema();
schema.removeField(TEXT_INDEX_COLUMN_MV);
schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN_MV,
FieldSpec.DataType.STRING, true));
-
- StaleSegment response =
- BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(TABLE_CONFIG, schema),
+ IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "single / multi value changed:
textColumnMV");
}
@@ -298,16 +297,14 @@ public class BaseTableDataManagerNeedRefreshTest {
@Test
void testSortColumnMismatch() {
// Check with a column that is not sorted
- StaleSegment response =
- BASE_TABLE_DATA_MANAGER.isSegmentStale(
-
getTableConfigBuilder().setSortedColumn(MS_SINCE_EPOCH_COLUMN_NAME).build(),
- SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER);
+ TableConfig tableConfig =
getTableConfigBuilder().setSortedColumn(MS_SINCE_EPOCH_COLUMN_NAME).build();
+ IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(tableConfig, SCHEMA);
+ StaleSegment response =
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig,
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "sort column changed:
MilliSecondsSinceEpoch");
// Check with a column that is sorted
- assertFalse(
-
BASE_TABLE_DATA_MANAGER.isSegmentStale(getTableConfigBuilder().setSortedColumn(TEXT_INDEX_COLUMN).build(),
- SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
+
tableConfig.getIndexingConfig().setSortedColumn(List.of(TEXT_INDEX_COLUMN));
+ assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig,
IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
}
@DataProvider(name = "testFilterArgs")
@@ -325,9 +322,9 @@ public class BaseTableDataManagerNeedRefreshTest {
null, null))).build(), "text index changed: textColumn"
}, {
"withFstIndex", getTableConfigBuilder().setFieldConfigList(List.of(
- new FieldConfig(FST_TEST_COLUMN, FieldConfig.EncodingType.DICTIONARY,
List.of(FieldConfig.IndexType.FST),
- null, Map.of(FieldConfig.TEXT_FST_TYPE,
FieldConfig.TEXT_NATIVE_FST_LITERAL)))).build(),
- "fst index changed: DestCityName"
+ new FieldConfig(FST_TEST_COLUMN, FieldConfig.EncodingType.DICTIONARY,
List.of(FieldConfig.IndexType.FST), null,
+ Map.of(FieldConfig.TEXT_FST_TYPE,
+ FieldConfig.TEXT_NATIVE_FST_LITERAL)))).build(), "fst index
changed: DestCityName"
}, {
"withRangeFilter", getTableConfigBuilder().setRangeIndexColumns(
List.of(MS_SINCE_EPOCH_COLUMN_NAME)).build(), "range index changed:
MilliSecondsSinceEpoch"
@@ -338,22 +335,22 @@ public class BaseTableDataManagerNeedRefreshTest {
@Test(dataProvider = "testFilterArgs")
void testFilter(String segmentName, TableConfig tableConfigWithFilter,
String expectedReason)
throws Exception {
+ IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(tableConfigWithFilter, SCHEMA);
ImmutableSegmentDataManager segmentWithFilter =
- createImmutableSegmentDataManager(tableConfigWithFilter, SCHEMA,
segmentName, generateRows());
+ createImmutableSegmentDataManager(indexLoadingConfig, segmentName,
generateRows());
// When TableConfig has a filter but segment does not have, needRefresh is
true.
- StaleSegment response =
- BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfigWithFilter, SCHEMA,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response =
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig,
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), expectedReason);
// When TableConfig does not have a filter but segment has, needRefresh is
true
- response = BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA,
segmentWithFilter);
+ response = BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG,
segmentWithFilter);
assertTrue(response.isStale());
assertEquals(response.getReason(), expectedReason);
// When TableConfig has a filter AND segment also has a filter,
needRefresh is false
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfigWithFilter,
SCHEMA, segmentWithFilter).isStale());
+ assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig,
segmentWithFilter).isStale());
}
@Test
@@ -361,23 +358,23 @@ public class BaseTableDataManagerNeedRefreshTest {
throws Exception {
TableConfig partitionedTableConfig =
getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig(
Map.of(PARTITIONED_COLUMN_NAME, new
ColumnPartitionConfig(PARTITION_FUNCTION_NAME, NUM_PARTITIONS)))).build();
+ IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(partitionedTableConfig, SCHEMA);
ImmutableSegmentDataManager segmentWithPartition =
- createImmutableSegmentDataManager(partitionedTableConfig, SCHEMA,
"partitionWithModulo", generateRows());
+ createImmutableSegmentDataManager(indexLoadingConfig,
"partitionWithModulo", generateRows());
// when segment has no partition AND tableConfig has partitions then
needRefresh = true
- StaleSegment response =
- BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfig, SCHEMA,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response =
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig,
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "partition function added:
partitionedColumn");
// when segment has partitions AND tableConfig has no partitions, then
needRefresh = false
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA,
segmentWithPartition).isStale());
+ assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG,
segmentWithPartition).isStale());
// when # of partitions is different, then needRefresh = true
TableConfig partitionedTableConfig40 =
getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig(
Map.of(PARTITIONED_COLUMN_NAME, new
ColumnPartitionConfig(PARTITION_FUNCTION_NAME, 40)))).build();
-
- response =
BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfig40, SCHEMA,
segmentWithPartition);
+ response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(partitionedTableConfig40, SCHEMA),
+ segmentWithPartition);
assertTrue(response.isStale());
assertEquals(response.getReason(), "num partitions changed:
partitionedColumn");
@@ -385,8 +382,8 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig partitionedTableConfigMurmur =
getTableConfigBuilder().setSegmentPartitionConfig(
new SegmentPartitionConfig(
Map.of(PARTITIONED_COLUMN_NAME, new
ColumnPartitionConfig("murmur", NUM_PARTITIONS)))).build();
-
- response =
BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfigMurmur, SCHEMA,
segmentWithPartition);
+ response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(partitionedTableConfigMurmur, SCHEMA),
+ segmentWithPartition);
assertTrue(response.isStale());
assertEquals(response.getReason(), "partition function name changed:
partitionedColumn");
}
@@ -395,36 +392,33 @@ public class BaseTableDataManagerNeedRefreshTest {
void testNullValueVector()
throws Exception {
TableConfig withoutNullHandling =
getTableConfigBuilder().setNullHandlingEnabled(false).build();
+ IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(withoutNullHandling, SCHEMA);
ImmutableSegmentDataManager segmentWithoutNullHandling =
- createImmutableSegmentDataManager(withoutNullHandling, SCHEMA,
"withoutNullHandling", generateRows());
+ createImmutableSegmentDataManager(indexLoadingConfig,
"withoutNullHandling", generateRows());
// If null handling is removed from table config AND segment has NVV, then
NVV can be removed. needRefresh = true
- StaleSegment response =
- BASE_TABLE_DATA_MANAGER.isSegmentStale(withoutNullHandling, SCHEMA,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response =
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig,
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "null value vector index removed from
column: NullValueColumn");
// if NVV is added to table config AND segment does not have NVV, then it
cannot be added. needRefresh = false
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA,
segmentWithoutNullHandling).isStale());
+ assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG,
segmentWithoutNullHandling).isStale());
}
@Test
- // Test 1 : Adding a StarTree index should trigger segment refresh.
- public void addStartreeIndex()
- throws Exception {
+ public void addStartreeIndex() {
+ // Test 1 : Adding a StarTree index should trigger segment refresh.
+
StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
-
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
- ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(getTableConfigBuilder().build(),
SCHEMA, _testName, generateRows());
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfig, SCHEMA,
segmentDataManager).isStale());
+ assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(tableConfig, SCHEMA),
+ IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
}
@Test
public void testStarTreeIndexWithDifferentColumn()
throws Exception {
-
// Test 2: Adding a new StarTree index with split dimension column of same
size but with different element should
// trigger segment refresh.
@@ -433,20 +427,21 @@ public class BaseTableDataManagerNeedRefreshTest {
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
// Create a StarTree index on Distance.
StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Distance"), null,
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA,
segmentDataManager).isStale());
+ assertTrue(
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ .isStale());
}
@Test
public void testStarTreeIndexWithManyColumns()
throws Exception {
-
// Test 3: Adding a new StarTree index with split dimension columns of
different size should trigger segment
// refresh.
@@ -455,19 +450,20 @@ public class BaseTableDataManagerNeedRefreshTest {
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA,
segmentDataManager).isStale());
+ assertTrue(
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ .isStale());
}
@Test
public void testStartIndexWithDifferentOrder()
throws Exception {
-
// Test 4: Adding a new StarTree index with the differently ordered split
dimension columns should trigger
// segment refresh.
@@ -476,33 +472,39 @@ public class BaseTableDataManagerNeedRefreshTest {
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
// Create a StarTree index.
StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Distance", "Carrier"), null,
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
- TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ TableConfig newTableConfig =
+
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertTrue(
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ .isStale());
}
@Test
void testStarTreeIndexWithSkipDimCols()
throws Exception {
-
// Test 5: Adding a new StarTree index with skipped dimension columns
should trigger segment refresh.
// Create a segment with StarTree index on Carrier, Distance.
+
StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
// Create a StarTree index.
StarTreeIndexConfig newStarTreeIndexConfig =
new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"),
Arrays.asList("Carrier", "Distance"),
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
- TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ TableConfig newTableConfig =
+
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertTrue(
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ .isStale());
}
@Test
@@ -510,35 +512,43 @@ public class BaseTableDataManagerNeedRefreshTest {
throws Exception {
// Test 6: Adding a new StarTree index with skipped dimension columns in
different order should not trigger
// segment refresh.
+
StarTreeIndexConfig starTreeIndexConfig =
new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"),
Arrays.asList("Carrier", "Distance"),
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
StarTreeIndexConfig newStarTreeIndexConfig =
new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"),
Arrays.asList("Distance", "Carrier"),
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
- TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ TableConfig newTableConfig =
+
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertFalse(
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ .isStale());
}
@Test
void testStarTreeIndexRemoveSkipDimCols()
throws Exception {
// Test 7: Adding a new StarTree index with removed skipped-dimension
column should trigger segment refresh.
+
StarTreeIndexConfig starTreeIndexConfig =
new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"),
Arrays.asList("Carrier", "Distance"),
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
- TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ TableConfig newTableConfig =
+
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertTrue(
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ .isStale());
}
@Test
@@ -548,15 +558,17 @@ public class BaseTableDataManagerNeedRefreshTest {
StarTreeIndexConfig starTreeIndex = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
-
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndex)).build();
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
- StarTreeIndexConfig starTreeIndexAddAggFn = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+ StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
Arrays.asList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName(),
"MAX__Distance"), null, 100);
- TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexAddAggFn)).build();
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ TableConfig newTableConfig =
+
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertTrue(
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ .isStale());
}
@Test
@@ -569,12 +581,15 @@ public class BaseTableDataManagerNeedRefreshTest {
Arrays.asList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName(),
"MAX__Distance"), null, 100);
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
Arrays.asList("MAX__Distance",
AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100);
- TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ TableConfig newTableConfig =
+
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertFalse(
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ .isStale());
}
@Test
@@ -587,32 +602,39 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
StarTreeIndexConfig newStarTreeIndexConfig =
new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
null,
List.of(new StarTreeAggregationConfig("Distance", "MAX")), 100);
- TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ TableConfig newTableConfig =
+
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertTrue(
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ .isStale());
}
@Test
void testStarTreeIndexNewMetricAgg()
throws Exception {
// Test 11 : Adding a new metric aggregation function through
functionColumnPairs should trigger segment refresh.
+
StarTreeAggregationConfig aggregationConfig = new
StarTreeAggregationConfig("Distance", "MAX");
StarTreeIndexConfig starTreeIndexConfig =
new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
null, List.of(aggregationConfig), 100);
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
// Create a StarTree index.
StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
Collections.singletonList(aggregationConfig), 100);
- TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ TableConfig newTableConfig =
+
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertTrue(
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ .isStale());
}
@Test
@@ -627,14 +649,17 @@ public class BaseTableDataManagerNeedRefreshTest {
Collections.singletonList(aggregationConfig), 100);
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
StarTreeAggregationConfig starTreeAggregationConfig2 = new
StarTreeAggregationConfig("*", "count");
StarTreeIndexConfig newStarTreeIndexConfig =
new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
null,
Arrays.asList(starTreeAggregationConfig2, aggregationConfig), 100);
- TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ TableConfig newTableConfig =
+
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertFalse(
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ .isStale());
}
@Test
@@ -644,12 +669,15 @@ public class BaseTableDataManagerNeedRefreshTest {
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 10);
- TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ TableConfig newTableConfig =
+
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertTrue(
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ .isStale());
}
@Test
@@ -659,9 +687,8 @@ public class BaseTableDataManagerNeedRefreshTest {
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
- assertTrue(
-
BASE_TABLE_DATA_MANAGER.isSegmentStale(getTableConfigBuilder().build(), SCHEMA,
segmentDataManager).isStale());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
+ assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG,
segmentDataManager).isStale());
}
@Test
@@ -673,13 +700,15 @@ public class BaseTableDataManagerNeedRefreshTest {
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Distance"), null,
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig,
newStarTreeIndexConfig)).build();
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA,
segmentDataManager).isStale());
+ assertTrue(
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ .isStale());
}
@Test
@@ -691,11 +720,13 @@ public class BaseTableDataManagerNeedRefreshTest {
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
newTableConfig.getIndexingConfig().setEnableDefaultStarTree(true);
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA,
segmentDataManager).isStale());
+ assertTrue(
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ .isStale());
}
@Test
@@ -706,10 +737,10 @@ public class BaseTableDataManagerNeedRefreshTest {
StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(tableConfig, SCHEMA);
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
-
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfig, SCHEMA,
segmentDataManager).isStale());
+ createImmutableSegmentDataManager(indexLoadingConfig, _testName,
generateRows());
+ assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig,
segmentDataManager).isStale());
}
@Test
@@ -722,10 +753,12 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
tableConfig.getIndexingConfig().setEnableDefaultStarTree(true);
ImmutableSegmentDataManager segmentDataManager =
- createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
newTableConfig.getIndexingConfig().setEnableDefaultStarTree(false);
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA,
segmentDataManager).isStale());
+ assertTrue(
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ .isStale());
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index c992c800da..163cd8a0c5 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -664,7 +664,8 @@ public class BaseTableDataManagerTest {
private static OfflineTableDataManager
createTableManager(InstanceDataManagerConfig instanceDataManagerConfig) {
OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class),
new SegmentLocks(), DEFAULT_TABLE_CONFIG,
- new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
null, null, SEGMENT_OPERATIONS_THROTTLER);
+ SCHEMA, new SegmentReloadSemaphore(1),
Executors.newSingleThreadExecutor(), null, null,
+ SEGMENT_OPERATIONS_THROTTLER);
return tableDataManager;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 372d5ce9c2..cd5c0c7c9b 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -89,7 +89,6 @@ public class DimensionTableDataManagerTest {
new SegmentDownloadThrottler(1, 2, true));
private File _indexDir;
- private SegmentMetadata _segmentMetadata;
private SegmentZKMetadata _segmentZKMetadata;
@BeforeClass
@@ -120,9 +119,9 @@ public class DimensionTableDataManagerTest {
String segmentName = driver.getSegmentName();
_indexDir = new File(tableDataDir, segmentName);
- _segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ SegmentMetadata segmentMetadata = new SegmentMetadataImpl(_indexDir);
_segmentZKMetadata = new SegmentZKMetadata(segmentName);
- _segmentZKMetadata.setCrc(Long.parseLong(_segmentMetadata.getCrc()));
+ _segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
}
@AfterClass
@@ -174,7 +173,7 @@ public class DimensionTableDataManagerTest {
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
DimensionTableDataManager tableDataManager =
DimensionTableDataManager.createInstanceByTableName(OFFLINE_TABLE_NAME);
- tableDataManager.init(instanceDataManagerConfig, helixManager, new
SegmentLocks(), tableConfig,
+ tableDataManager.init(instanceDataManagerConfig, helixManager, new
SegmentLocks(), tableConfig, schema,
new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
null, null, SEGMENT_OPERATIONS_THROTTLER);
tableDataManager.start();
return tableDataManager;
@@ -294,8 +293,9 @@ public class DimensionTableDataManagerTest {
Schema schemaWithExtraColumn = getSchemaWithExtraColumn();
when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null,
AccessOption.PERSISTENT)).thenReturn(
SchemaUtils.toZNRecord(schemaWithExtraColumn));
- tableDataManager.reloadSegment(_segmentZKMetadata.getSegmentName(),
- new IndexLoadingConfig(tableConfig, schemaWithExtraColumn),
_segmentZKMetadata, _segmentMetadata, false);
+ when(propertyStore.get("/SEGMENTS/dimBaseballTeams_OFFLINE/" +
_segmentZKMetadata.getSegmentName(), null,
+ AccessOption.PERSISTENT)).thenReturn(_segmentZKMetadata.toZNRecord());
+ tableDataManager.reloadSegment(_segmentZKMetadata.getSegmentName(), false);
// Confirm the new column is available for lookup
teamCitySpec = tableDataManager.getColumnFieldSpec("teamCity");
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 8bf32c576b..1aa0367421 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -35,12 +35,10 @@ import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.config.TableConfigUtils;
import
org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider;
import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
@@ -74,7 +72,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -750,18 +747,13 @@ public class RealtimeSegmentDataManagerTest {
@Test
public void
testShutdownTableDataManagerWillNotShutdownLeaseExtenderExecutor()
throws Exception {
- TableConfig tableConfig = createTableConfig();
- tableConfig.setUpsertConfig(null);
- ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
- when(propertyStore.get(anyString(), any(),
anyInt())).thenReturn(TableConfigUtils.toZNRecord(tableConfig));
- HelixManager helixManager = mock(HelixManager.class);
- when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
-
InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
TableDataManagerProvider tableDataManagerProvider = new
DefaultTableDataManagerProvider();
- tableDataManagerProvider.init(instanceDataManagerConfig, helixManager, new
SegmentLocks(), null);
- TableDataManager tableDataManager =
tableDataManagerProvider.getTableDataManager(tableConfig);
+ tableDataManagerProvider.init(instanceDataManagerConfig,
mock(HelixManager.class), new SegmentLocks(), null);
+ TableConfig tableConfig = createTableConfig();
+ Schema schema = Fixtures.createSchema();
+ TableDataManager tableDataManager =
tableDataManagerProvider.getTableDataManager(tableConfig, schema);
tableDataManager.start();
tableDataManager.shutDown();
Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown());
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
index 9afed0b634..7c9bd2e4f4 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
@@ -136,7 +136,7 @@ public class QueryExecutorExceptionsTest {
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
TableDataManagerProvider tableDataManagerProvider = new
DefaultTableDataManagerProvider();
tableDataManagerProvider.init(instanceDataManagerConfig,
mock(HelixManager.class), new SegmentLocks(), null);
- TableDataManager tableDataManager =
tableDataManagerProvider.getTableDataManager(tableConfig);
+ TableDataManager tableDataManager =
tableDataManagerProvider.getTableDataManager(tableConfig, schema);
tableDataManager.start();
//we don't add index segments to the data manager to simulate
numSegmentsAcquired < numSegmentsQueried
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 095a1c0a89..022d134d36 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -160,7 +160,7 @@ public class QueryExecutorTest {
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
TableDataManagerProvider tableDataManagerProvider = new
DefaultTableDataManagerProvider();
tableDataManagerProvider.init(instanceDataManagerConfig,
mock(HelixManager.class), new SegmentLocks(), null);
- TableDataManager tableDataManager =
tableDataManagerProvider.getTableDataManager(tableConfig);
+ TableDataManager tableDataManager =
tableDataManagerProvider.getTableDataManager(tableConfig, schema);
tableDataManager.start();
for (ImmutableSegment indexSegment : _indexSegments) {
tableDataManager.addSegment(indexSegment);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index aa2d9e2cc1..db1af9416c 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -279,7 +279,7 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest
{
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
TableDataManagerProvider tableDataManagerProvider = new
DefaultTableDataManagerProvider();
tableDataManagerProvider.init(instanceDataManagerConfig,
mock(HelixManager.class), new SegmentLocks(), null);
- TableDataManager tableDataManager =
tableDataManagerProvider.getTableDataManager(TABLE_CONFIG);
+ TableDataManager tableDataManager =
tableDataManagerProvider.getTableDataManager(TABLE_CONFIG, SCHEMA);
tableDataManager.start();
for (IndexSegment indexSegment : _indexSegments) {
tableDataManager.addSegment((ImmutableSegment) indexSegment);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
index 1c509ca5e8..ffbd50a3f5 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
@@ -141,7 +141,7 @@ public class SegmentWithNullValueVectorTest {
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
TableDataManagerProvider tableDataManagerProvider = new
DefaultTableDataManagerProvider();
tableDataManagerProvider.init(instanceDataManagerConfig,
mock(HelixManager.class), new SegmentLocks(), null);
- TableDataManager tableDataManager =
tableDataManagerProvider.getTableDataManager(tableConfig);
+ TableDataManager tableDataManager =
tableDataManagerProvider.getTableDataManager(tableConfig, _schema);
tableDataManager.start();
tableDataManager.addSegment(_segment);
_instanceDataManager = mock(InstanceDataManager.class);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
index 7fbc220806..67cca949e9 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
@@ -37,6 +37,7 @@ import
org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -62,8 +63,9 @@ public class FailureInjectingTableDataManagerProvider
implements TableDataManage
}
@Override
- public TableDataManager getTableDataManager(TableConfig tableConfig,
SegmentReloadSemaphore segmentReloadSemaphore,
- ExecutorService segmentReloadExecutor, @Nullable ExecutorService
segmentPreloadExecutor,
+ public TableDataManager getTableDataManager(TableConfig tableConfig, Schema
schema,
+ SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService
segmentReloadExecutor,
+ @Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
Supplier<Boolean> isServerReadyToServeQueries) {
TableDataManager tableDataManager;
@@ -89,9 +91,8 @@ public class FailureInjectingTableDataManagerProvider
implements TableDataManage
default:
throw new IllegalStateException();
}
- tableDataManager.init(_instanceDataManagerConfig, _helixManager,
_segmentLocks, tableConfig, segmentReloadSemaphore,
- segmentReloadExecutor, segmentPreloadExecutor,
- errorCache, null);
+ tableDataManager.init(_instanceDataManagerConfig, _helixManager,
_segmentLocks, tableConfig, schema,
+ segmentReloadSemaphore, segmentReloadExecutor, segmentPreloadExecutor,
errorCache, null);
return tableDataManager;
}
}
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
index 3ed62ec35a..d2aa77321d 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
@@ -184,7 +184,7 @@ public class BenchmarkDimensionTableOverhead extends
BaseQueriesTest {
String tableName = TABLE_NAME + "_" + _iteration;
_tableDataManager =
DimensionTableDataManager.createInstanceByTableName(tableName);
- _tableDataManager.init(instanceDataManagerConfig, helixManager, new
SegmentLocks(), tableConfig,
+ _tableDataManager.init(instanceDataManagerConfig, helixManager, new
SegmentLocks(), tableConfig, SCHEMA,
new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
null, null, SEGMENT_OPERATIONS_THROTTLER);
_tableDataManager.start();
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index ed1f96f4da..e8351e5cbe 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -53,8 +53,8 @@ public interface TableDataManager {
* Initializes the table data manager. Should be called only once and before
calling any other method.
*/
void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager
helixManager, SegmentLocks segmentLocks,
- TableConfig tableConfig, SegmentReloadSemaphore segmentReloadSemaphore,
ExecutorService segmentReloadExecutor,
- @Nullable ExecutorService segmentPreloadExecutor,
+ TableConfig tableConfig, Schema schema, SegmentReloadSemaphore
segmentReloadSemaphore,
+ ExecutorService segmentReloadExecutor, @Nullable ExecutorService
segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
@Nullable SegmentOperationsThrottler segmentOperationsThrottler);
@@ -294,23 +294,16 @@ public interface TableDataManager {
*/
SegmentZKMetadata fetchZKMetadata(String segmentName);
- /**
- * Fetches the table config and schema for the table from ZK.
- */
- Pair<TableConfig, Schema> fetchTableConfigAndSchema();
-
/**
* Fetches the table config and schema for the table from ZK, then construct
the index loading config with them.
*/
- default IndexLoadingConfig fetchIndexLoadingConfig() {
- Pair<TableConfig, Schema> tableConfigSchemaPair =
fetchTableConfigAndSchema();
- return getIndexLoadingConfig(tableConfigSchemaPair.getLeft(),
tableConfigSchemaPair.getRight());
- }
+ IndexLoadingConfig fetchIndexLoadingConfig();
/**
- * Constructs the index loading config for the table with the given table
config and schema.
+ * Returns the cached latest {@link IndexLoadingConfig} for the table. The
cache is refreshed when invoking
+ * {@link #fetchIndexLoadingConfig()}.
*/
- IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, Schema
schema);
+ IndexLoadingConfig getIndexLoadingConfig();
/**
* Interface to handle segment state transitions from CONSUMING to DROPPED
@@ -330,9 +323,8 @@ public interface TableDataManager {
/**
* Return list of segment names that are stale along with reason.
- * @param tableConfig Table Config of the table
- * @param schema Schema of the table
+ *
* @return List of {@link StaleSegment} with segment names and reason why it
is stale
*/
- List<StaleSegment> getStaleSegments(TableConfig tableConfig, Schema schema);
+ List<StaleSegment> getStaleSegments();
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index f7038ed11f..3df7a4290e 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -99,11 +99,9 @@ import
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.server.access.AccessControlFactory;
import org.apache.pinot.server.api.AdminApiApplication;
import org.apache.pinot.server.starter.ServerInstance;
-import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -1188,8 +1186,7 @@ public class TablesResource {
tableName = DatabaseUtils.translateTableName(tableName, headers);
TableDataManager tableDataManager =
ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName);
try {
- Pair<TableConfig, Schema> tableConfigSchemaPair =
tableDataManager.fetchTableConfigAndSchema();
- return
tableDataManager.getStaleSegments(tableConfigSchemaPair.getLeft(),
tableConfigSchemaPair.getRight());
+ return tableDataManager.getStaleSegments();
} catch (Exception e) {
throw new WebApplicationException(e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR);
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index b79a759e76..b3ed24fa81 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -63,6 +63,7 @@ import
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -294,9 +295,11 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore,
tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table
config for table: %s", tableNameWithType);
}
+ Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
tableNameWithType);
+ Preconditions.checkState(schema != null, "Failed to find schema for table:
%s", tableNameWithType);
TableDataManager tableDataManager =
- _tableDataManagerProvider.getTableDataManager(tableConfig,
_segmentReloadSemaphore, _segmentReloadExecutor,
- _segmentPreloadExecutor, _errorCache,
_isServerReadyToServeQueries);
+ _tableDataManagerProvider.getTableDataManager(tableConfig, schema,
_segmentReloadSemaphore,
+ _segmentReloadExecutor, _segmentPreloadExecutor, _errorCache,
_isServerReadyToServeQueries);
tableDataManager.start();
LOGGER.info("Created table data manager for table: {}", tableNameWithType);
return tableDataManager;
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 1cc0b05b83..9967cade27 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -51,15 +51,16 @@ import
org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.server.access.AllowAllAccessFactory;
import org.apache.pinot.server.starter.ServerInstance;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
-import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -68,6 +69,8 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
public abstract class BaseResourceTest {
@@ -98,9 +101,9 @@ public abstract class BaseResourceTest {
ServerMetrics.register(mock(ServerMetrics.class));
FileUtils.deleteQuietly(TEMP_DIR);
- Assert.assertTrue(TEMP_DIR.mkdirs());
+ assertTrue(TEMP_DIR.mkdirs());
URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
- Assert.assertNotNull(resourceUrl);
+ assertNotNull(resourceUrl);
_avroFile = new File(resourceUrl.getFile());
// Mock the instance data manager
@@ -197,13 +200,15 @@ public abstract class BaseResourceTest {
protected void addTable(String tableNameWithType) {
InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
- TableConfig tableConfig = mock(TableConfig.class);
- when(tableConfig.getTableName()).thenReturn(tableNameWithType);
-
when(tableConfig.getValidationConfig()).thenReturn(mock(SegmentsValidationAndRetentionConfig.class));
- // NOTE: Use OfflineTableDataManager for both OFFLINE and REALTIME table
because RealtimeTableDataManager requires
- // table config.
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ assertNotNull(tableType);
+ TableConfig tableConfig = new
TableConfigBuilder(tableType).setTableName(tableNameWithType).build();
+ Schema schema =
+ new
Schema.SchemaBuilder().setSchemaName(TableNameBuilder.extractRawTableName(tableNameWithType)).build();
+ // NOTE: Use OfflineTableDataManager for both OFFLINE and REALTIME table
because RealtimeTableDataManager performs
+ // more checks
TableDataManager tableDataManager = new OfflineTableDataManager();
- tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class),
new SegmentLocks(), tableConfig,
+ tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class),
new SegmentLocks(), tableConfig, schema,
new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
null, null, null);
tableDataManager.start();
_tableDataManagerMap.put(tableNameWithType, tableDataManager);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]