This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4876fb754d Add a Server API to list segments that need to be refreshed
for a table (#14451)
4876fb754d is described below
commit 4876fb754d5f22c1964323c6e114192ef13cea35
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Wed Dec 4 11:12:59 2024 +0530
Add a Server API to list segments that need to be refreshed for a table
(#14451)
* checkpoint
* Create API to get segments needing refresh
* Add controller API
* Checkstyle
* Checkpoint tests
* Consolidate setup to static variables
* Checkstyle
* Add more index tests
* Partition tests
* Add NullVectorValue Test
* Add NeedRefreshResponse with reason
* Add NeedRefreshResponse with reason
* Checkstyle & spotless
* Fix Jackson serde
* Fix FST Index test
* Checkpoint H3 experiments
* Checkpoint tests
* Fix all tests
* Doc strings
* Add column name to debug logs
* Release segments
* Add table name to message.
* Use instance name instead of url.
* Use Stale instead of needRefresh
* Add TableStaleSegmentResponse
* Checkstyle fixes
* Log time taken to get stale segments in server
* Add tests for startree index
---
.../api/resources/PinotSegmentRestletResource.java | 25 +
.../api/resources/TableStaleSegmentResponse.java | 67 ++
.../util/ServerSegmentMetadataReader.java | 35 +
.../pinot/controller/util/TableMetadataReader.java | 14 +
.../core/data/manager/BaseTableDataManager.java | 288 ++++++++
.../BaseTableDataManagerNeedRefreshTest.java | 730 +++++++++++++++++++++
.../data/manager/BaseTableDataManagerTest.java | 2 +-
.../tests/StaleSegmentCheckIntegrationTest.java | 201 ++++++
.../segment/local/data/manager/StaleSegment.java | 61 ++
.../local/data/manager/TableDataManager.java | 8 +
.../pinot/server/api/resources/TablesResource.java | 25 +
.../utils/builder/ControllerRequestURLBuilder.java | 4 +
12 files changed, 1459 insertions(+), 1 deletion(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index b114aa8844..7499098780 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -893,6 +893,31 @@ public class PinotSegmentRestletResource {
}
}
+ @GET
+ @Path("segments/{tableNameWithType}/isStale")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType",
action = Actions.Table.GET_METADATA)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Gets a list of segments that are stale from servers
hosting the table",
+ notes = "Gets a list of segments that are stale from servers hosting the
table")
+ public Map<String, TableStaleSegmentResponse> getStaleSegments(
+ @ApiParam(value = "Table name with type", required = true, example =
"myTable_REALTIME")
+ @PathParam("tableNameWithType") String tableNameWithType, @Context
HttpHeaders headers) {
+ tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
+ LOGGER.info("Received a request to check for segments requiring a refresh
from all servers hosting segments for "
+ + "table {}", tableNameWithType);
+ try {
+ TableMetadataReader tableMetadataReader =
+ new TableMetadataReader(_executor, _connectionManager,
_pinotHelixResourceManager);
+ return tableMetadataReader.getStaleSegments(tableNameWithType,
+ _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+ } catch (InvalidConfigException e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Status.BAD_REQUEST);
+ } catch (IOException ioe) {
+ throw new ControllerApplicationException(LOGGER, "Error parsing Pinot
server response: " + ioe.getMessage(),
+ Status.INTERNAL_SERVER_ERROR, ioe);
+ }
+ }
+
@GET
@Path("segments/{tableName}/zkmetadata")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_METADATA)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableStaleSegmentResponse.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableStaleSegmentResponse.java
new file mode 100644
index 0000000000..eead74dd8f
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableStaleSegmentResponse.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import org.apache.pinot.segment.local.data.manager.StaleSegment;
+
+
+public class TableStaleSegmentResponse {
+ private final List<StaleSegment> _staleSegmentList;
+ private final boolean _isValidResponse;
+ private final String _errorMessage;
+
+ @JsonCreator
+ public TableStaleSegmentResponse(@JsonProperty("staleSegmentList")
List<StaleSegment> staleSegmentList,
+ @JsonProperty("validResponse") boolean isValidResponse,
+ @JsonProperty("errorMessage") String errorMessage) {
+ _staleSegmentList = staleSegmentList;
+ _isValidResponse = isValidResponse;
+ _errorMessage = errorMessage;
+ }
+
+ public TableStaleSegmentResponse(List<StaleSegment> staleSegmentList) {
+ _staleSegmentList = staleSegmentList;
+ _isValidResponse = true;
+ _errorMessage = null;
+ }
+
+ public TableStaleSegmentResponse(String errorMessage) {
+ _staleSegmentList = null;
+ _isValidResponse = false;
+ _errorMessage = errorMessage;
+ }
+
+ @JsonProperty
+ public List<StaleSegment> getStaleSegmentList() {
+ return _staleSegmentList;
+ }
+
+ @JsonProperty
+ public boolean isValidResponse() {
+ return _isValidResponse;
+ }
+
+ @JsonProperty
+ public String getErrorMessage() {
+ return _errorMessage;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index 781140a978..8dde7f08fe 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -47,6 +47,8 @@ import
org.apache.pinot.common.restlet.resources.TableSegments;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
+import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse;
+import org.apache.pinot.segment.local.data.manager.StaleSegment;
import org.apache.pinot.spi.utils.JsonUtils;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
@@ -397,6 +399,34 @@ public class ServerSegmentMetadataReader {
return response;
}
+ public Map<String, TableStaleSegmentResponse> getStaleSegmentsFromServer(
+ String tableNameWithType, Set<String> serverInstances, BiMap<String,
String> endpoints, int timeoutMs) {
+ LOGGER.debug("Getting list of segments for refresh from servers for table
{}.", tableNameWithType);
+ List<String> serverURLs = new ArrayList<>();
+ for (String serverInstance : serverInstances) {
+ serverURLs.add(generateStaleSegmentsServerURL(tableNameWithType,
endpoints.get(serverInstance)));
+ }
+ BiMap<String, String> endpointsToServers = endpoints.inverse();
+ CompletionServiceHelper completionServiceHelper =
+ new CompletionServiceHelper(_executor, _connectionManager,
endpointsToServers);
+ CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ completionServiceHelper.doMultiGetRequest(serverURLs,
tableNameWithType, false, timeoutMs);
+ Map<String, TableStaleSegmentResponse> serverResponses = new HashMap<>();
+
+ for (Map.Entry<String, String> streamResponse :
serviceResponse._httpResponses.entrySet()) {
+ try {
+ List<StaleSegment> staleSegments =
JsonUtils.stringToObject(streamResponse.getValue(),
+ new TypeReference<List<StaleSegment>>() { });
+ serverResponses.put(streamResponse.getKey(), new
TableStaleSegmentResponse(staleSegments));
+ } catch (Exception e) {
+ serverResponses.put(streamResponse.getKey(), new
TableStaleSegmentResponse(e.getMessage()));
+ LOGGER.error("Unable to parse server {} response for needRefresh for
table {} due to an error: ",
+ streamResponse.getKey(), tableNameWithType, e);
+ }
+ }
+ return serverResponses;
+ }
+
private String generateAggregateSegmentMetadataServerURL(String
tableNameWithType, List<String> columns,
String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
@@ -470,4 +500,9 @@ public class ServerSegmentMetadataReader {
paramsStr = String.join("&", params);
return paramsStr;
}
+
+ private String generateStaleSegmentsServerURL(String tableNameWithType,
String endpoint) {
+ tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
+ return String.format("%s/tables/%s/segments/isStale", endpoint,
tableNameWithType);
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
index c9e87b396b..48f53577a8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
@@ -33,6 +33,7 @@ import
org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
+import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -199,4 +200,17 @@ public class TableMetadataReader {
segmentNames, timeoutMs, validDocIdsType,
numSegmentsBatchPerServerRequest);
return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo);
}
+
+ public Map<String, TableStaleSegmentResponse> getStaleSegments(String
tableNameWithType,
+ int timeoutMs)
+ throws InvalidConfigException, IOException {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ List<String> serverInstances =
_pinotHelixResourceManager.getServerInstancesForTable(tableNameWithType,
tableType);
+ Set<String> serverInstanceSet = new HashSet<>(serverInstances);
+ BiMap<String, String> endpoints =
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverInstanceSet);
+ ServerSegmentMetadataReader serverSegmentMetadataReader =
+ new ServerSegmentMetadataReader(_executor, _connectionManager);
+ return
serverSegmentMetadataReader.getStaleSegmentsFromServer(tableNameWithType,
serverInstanceSet, endpoints,
+ timeoutMs);
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 10ff609b44..e3e17a6f4d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -30,6 +30,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -40,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -59,25 +62,40 @@ import
org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.StaleSegment;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
+import org.apache.pinot.segment.local.startree.StarTreeBuilderUtils;
+import
org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig;
import org.apache.pinot.segment.local.utils.SegmentLocks;
+import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
@@ -1046,6 +1064,276 @@ public abstract class BaseTableDataManager implements
TableDataManager {
return needReload;
}
+ @Override
+ public List<StaleSegment> getStaleSegments(TableConfig tableConfig, Schema
schema) {
+ List<StaleSegment> staleSegments = new ArrayList<>();
+ List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
+ final long startTime = System.currentTimeMillis();
+ try {
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ StaleSegment response = isSegmentStale(tableConfig, schema,
segmentDataManager);
+ if (response.isStale()) {
+ staleSegments.add(response);
+ }
+ }
+ } finally {
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ releaseSegment(segmentDataManager);
+ }
+ LOGGER.info("Time Taken to get stale segments: {} ms",
System.currentTimeMillis() - startTime);
+ }
+
+ return staleSegments;
+ }
+
+ protected StaleSegment isSegmentStale(TableConfig tableConfig, Schema schema,
+ SegmentDataManager segmentDataManager) {
+ String tableNameWithType = tableConfig.getTableName();
+ Map<String, FieldIndexConfigs> indexConfigsMap =
+ FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema);
+
+ String segmentName = segmentDataManager.getSegmentName();
+ IndexSegment segment = segmentDataManager.getSegment();
+ SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+ Set<String> segmentPhysicalColumns = segment.getPhysicalColumnNames();
+
+ // Time column changed
+ String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
+ if (timeColumn != null) {
+ if (segmentMetadata.getTimeColumn() == null ||
!segmentMetadata.getTimeColumn().equals(timeColumn)) {
+ LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: time
column", tableNameWithType, segmentName);
+ return new StaleSegment(segmentName, true, "time column");
+ }
+ }
+
+ List<String> sortedColumns =
tableConfig.getIndexingConfig().getSortedColumn();
+ String sortedColumn = CollectionUtils.isNotEmpty(sortedColumns) ?
sortedColumns.get(0) : null;
+
+ String partitionColumn = null;
+ ColumnPartitionConfig partitionConfig = null;
+ SegmentPartitionConfig segmentPartitionConfig =
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+ // NOTE: Partition can only be enabled on a single column
+ if (segmentPartitionConfig != null &&
segmentPartitionConfig.getColumnPartitionMap().size() == 1) {
+ Map.Entry<String, ColumnPartitionConfig> entry =
+
segmentPartitionConfig.getColumnPartitionMap().entrySet().iterator().next();
+ partitionColumn = entry.getKey();
+ partitionConfig = entry.getValue();
+ }
+
+ Set<String> columnsInSegment = segmentMetadata.getAllColumns();
+
+ // Column is added
+ if (!columnsInSegment.containsAll(schema.getPhysicalColumnNames())) {
+ LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: column
added", tableNameWithType, segmentName);
+ return new StaleSegment(segmentName, true, "column added");
+ }
+
+ // Get Index configuration for the Table Config
+ Set<String> noDictionaryColumns =
+
FieldIndexConfigsUtil.columnsWithIndexDisabled(StandardIndexes.dictionary(),
indexConfigsMap);
+ Set<String> bloomFilters =
+
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.bloomFilter(),
indexConfigsMap);
+ Set<String> jsonIndex =
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.json(),
indexConfigsMap);
+ Set<String> invertedIndex =
+
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.inverted(),
indexConfigsMap);
+ Set<String> nullValueVectorIndex =
+
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.nullValueVector(),
indexConfigsMap);
+ Set<String> rangeIndex =
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.range(),
indexConfigsMap);
+ Set<String> h3Indexes =
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.h3(),
indexConfigsMap);
+ Set<String> fstIndexes =
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.fst(),
indexConfigsMap);
+ Set<String> textIndexes =
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.text(),
indexConfigsMap);
+ List<StarTreeIndexConfig> starTreeIndexConfigsFromTableConfig =
+ tableConfig.getIndexingConfig().getStarTreeIndexConfigs();
+
+ // Get the index configuration for StarTree index from segment metadata as
JsonNode.
+ List<StarTreeV2> starTreeIndexMetadata = segment.getStarTrees();
+
+ // Generate StarTree index builder config from the segment metadata.
+ List<StarTreeV2BuilderConfig> builderConfigFromSegmentMetadata = new
ArrayList<>();
+ if (starTreeIndexMetadata != null) {
+ for (StarTreeV2 starTreeV2 : starTreeIndexMetadata) {
+
builderConfigFromSegmentMetadata.add(StarTreeV2BuilderConfig.fromMetadata(starTreeV2.getMetadata()));
+ }
+ }
+
+ // Generate StarTree index builder configs from the table config.
+ List<StarTreeV2BuilderConfig> builderConfigFromTableConfigs =
+
StarTreeBuilderUtils.generateBuilderConfigs(starTreeIndexConfigsFromTableConfig,
+ tableConfig.getIndexingConfig().isEnableDefaultStarTree(),
segmentMetadata);
+
+ // Check if there is a mismatch between the StarTree index builder configs
from the table config and the segment
+ // metadata.
+ if
(!StarTreeBuilderUtils.areStarTreeBuilderConfigListsEqual(builderConfigFromTableConfigs,
+ builderConfigFromSegmentMetadata)) {
+ return new StaleSegment(segmentName, true, "startree index");
+ }
+
+ for (String columnName : segmentPhysicalColumns) {
+ ColumnMetadata columnMetadata =
segmentMetadata.getColumnMetadataFor(columnName);
+ FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName);
+ DataSource source = segment.getDataSource(columnName);
+ Preconditions.checkNotNull(columnMetadata);
+ Preconditions.checkNotNull(source);
+
+ // Column is deleted
+ if (fieldSpecInSchema == null) {
+ LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {},
change: column deleted",
+ tableNameWithType, columnName, segmentName);
+ return new StaleSegment(segmentName, true, "column deleted: " +
columnName);
+ }
+
+ // Field type changed
+ if
(columnMetadata.getFieldType().compareTo(fieldSpecInSchema.getFieldType()) !=
0) {
+ LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {},
change: field type", tableNameWithType,
+ columnName, segmentName);
+ return new StaleSegment(segmentName, true, "field type changed: " +
columnName);
+ }
+
+ // Data type changed
+ if
(!columnMetadata.getDataType().equals(fieldSpecInSchema.getDataType())) {
+ LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {},
change: data type", tableNameWithType,
+ columnName, segmentName);
+ return new StaleSegment(segmentName, true, "data type changed: " +
columnName);
+ }
+
+ // SV/MV changed
+ if (columnMetadata.isSingleValue() !=
fieldSpecInSchema.isSingleValueField()) {
+ LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {},
change: single / multi value",
+ tableNameWithType, columnName, segmentName);
+ return new StaleSegment(segmentName, true, "single / multi value
changed: " + columnName);
+ }
+
+ // TODO: detect if an index changes from Dictionary to Variable Length
Dictionary or vice versa.
+ // TODO: RV TEST
+ boolean colHasDictionary = columnMetadata.hasDictionary();
+ // Encoding changed
+ if (colHasDictionary == noDictionaryColumns.contains(columnName)) {
+ // Check if dictionary update is needed
+ // 1. If the segment metadata has dictionary enabled and table has it
disabled, its incompatible and refresh is
+ // needed.
+ // 2. If segment metadata has dictionary disabled, check if it has to
be overridden. If not overridden,
+ // refresh is needed, since table has it enabled.
+ boolean incompatible = colHasDictionary ||
DictionaryIndexType.ignoreDictionaryOverride(
+ tableConfig.getIndexingConfig().isOptimizeDictionary(),
+ tableConfig.getIndexingConfig().isOptimizeDictionaryForMetrics(),
+
tableConfig.getIndexingConfig().getNoDictionarySizeRatioThreshold(),
+
tableConfig.getIndexingConfig().getNoDictionaryCardinalityRatioThreshold(),
fieldSpecInSchema,
+ indexConfigsMap.get(columnName), columnMetadata.getCardinality(),
columnMetadata.getTotalNumberOfEntries());
+ if (incompatible) {
+ LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName:
{}, change: dictionary encoding,",
+ tableNameWithType, columnName, segmentName);
+ return new StaleSegment(segmentName, true, "dictionary encoding
changed: " + columnName);
+ } else {
+ LOGGER.debug("tableNameWithType: {}, segmentName: {}, no change as
dictionary overrides applied to col: {}",
+ tableNameWithType, segmentName, columnName);
+ }
+ }
+
+ // Sorted column not sorted
+ if (columnName.equals(sortedColumn) && !columnMetadata.isSorted()) {
+ LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {},
change: sort column", tableNameWithType,
+ columnName, segmentName);
+ return new StaleSegment(segmentName, true, "sort column changed: " +
columnName);
+ }
+
+ if (Objects.isNull(source.getBloomFilter()) ==
bloomFilters.contains(columnName)) {
+ LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {},
change: bloom filter changed",
+ tableNameWithType, columnName, segmentName);
+ return new StaleSegment(segmentName, true, "bloom filter changed: " +
columnName);
+ }
+
+ if (Objects.isNull(source.getJsonIndex()) ==
jsonIndex.contains(columnName)) {
+ LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {},
change: json index changed",
+ tableNameWithType, columnName, segmentName);
+ return new StaleSegment(segmentName, true, "json index changed: " +
columnName);
+ }
+
+ if (Objects.isNull(source.getTextIndex()) ==
textIndexes.contains(columnName)) {
+ LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {},
change: text index changed",
+ tableNameWithType, columnName, segmentName);
+ return new StaleSegment(segmentName, true, "text index changed: " +
columnName);
+ }
+
+ if (Objects.isNull(source.getFSTIndex()) ==
fstIndexes.contains(columnName)) {
+ LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {},
change: fst index changed",
+ tableNameWithType, columnName, segmentName);
+ return new StaleSegment(segmentName, true, "fst index changed: " +
columnName);
+ }
+
+ if (Objects.isNull(source.getH3Index()) ==
h3Indexes.contains(columnName)) {
+ LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {},
change: h3 index changed",
+ tableNameWithType, columnName, segmentName);
+ return new StaleSegment(segmentName, true, "hst index changed: " +
columnName);
+ }
+
+ // If a segment is sorted then it will automatically be given an
inverted index and that overrides the
+ // TableConfig setting
+ if (columnMetadata.isSorted()) {
+ // If a column is sorted and does not have an inverted index but the
table config does have an inverted index.
+ // But do not remove the inverted index from a sorted column even if
the table config has no inverted index.
+ if (Objects.isNull(source.getInvertedIndex()) &&
invertedIndex.contains(columnName)) {
+ LOGGER.debug(
+ "tableNameWithType: {}, columnName: {}, segmentName: {}, change:
inverted index added to sorted column",
+ tableNameWithType, columnName, segmentName);
+ return new StaleSegment(segmentName, true, "invert index added to
sort column: " + columnName);
+ }
+ } else {
+ if ((Objects.isNull(source.getInvertedIndex())) ==
invertedIndex.contains(columnName)) {
+ LOGGER.debug(
+ "tableNameWithType: {}, columnName: {}, segmentName: {}, change:
inverted index changed on unsorted "
+ + "column",
+ tableNameWithType, columnName, segmentName);
+ return new StaleSegment(segmentName, true, "inverted index changed
on unsorted column: " + columnName);
+ }
+ }
+
+ // If a column has a NVV Reader and the Table Config says that it should
not, then the NVV Reader can be removed.
+ // BUT if a column does NOT have a NVV Reader it cannot be added after
the segment is created. So, for this check
+ // only check to see if an existing NVV Reader should be removed, but do
not check if an NVV Reader needs to be
+ // added.
+ if (!Objects.isNull(source.getNullValueVector()) &&
!nullValueVectorIndex.contains(columnName)) {
+ LOGGER.debug(
+ "tableNameWithType: {}, columnName: {}, segmentName: {}, change:
null value vector index removed from "
+ + "column and cannot be added back to this segment.",
tableNameWithType, columnName, segmentName);
+ return new StaleSegment(segmentName, true, "null value vector index
removed from column: " + columnName);
+ }
+
+ if (Objects.isNull(source.getRangeIndex()) ==
rangeIndex.contains(columnName)) {
+ LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {},
change: range index changed",
+ tableNameWithType, columnName, segmentName);
+ return new StaleSegment(segmentName, true, "range index changed: " +
columnName);
+ }
+
+ // Partition changed or segment not properly partitioned
+ if (columnName.equals(partitionColumn)) {
+ PartitionFunction partitionFunction =
columnMetadata.getPartitionFunction();
+ if (partitionFunction == null) {
+ LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName:
{}, change: partition function",
+ tableNameWithType, columnName, segmentName);
+ return new StaleSegment(segmentName, true, "partition function
added: " + columnName);
+ }
+ if
(!partitionFunction.getName().equalsIgnoreCase(partitionConfig.getFunctionName()))
{
+ LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName:
{}, change: partition function name",
+ tableNameWithType, columnName, segmentName);
+ return new StaleSegment(segmentName, true, "partition function name
changed: " + columnName);
+ }
+ if (partitionFunction.getNumPartitions() !=
partitionConfig.getNumPartitions()) {
+ LOGGER.debug("tableNameWithType: {}, columnName: {},, segmentName:
{}, change: num partitions",
+ tableNameWithType, columnName, segmentName);
+ return new StaleSegment(segmentName, true, "num partitions changed:
" + columnName);
+ }
+ Set<Integer> partitions = columnMetadata.getPartitions();
+ if (partitions == null || partitions.size() != 1) {
+ LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName:
{}, change: partitions", tableNameWithType,
+ columnName, segmentName);
+ return new StaleSegment(segmentName, true, "partitions changed: " +
columnName);
+ }
+ }
+ }
+
+ return new StaleSegment(segmentName, false, null);
+ }
+
private SegmentDirectory initSegmentDirectory(String segmentName, String
segmentCrc,
IndexLoadingConfig indexLoadingConfig)
throws Exception {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
new file mode 100644
index 0000000000..14cfd8cb11
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
@@ -0,0 +1,730 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.StaleSegment;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.StarTreeAggregationConfig;
+import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+@Test
+public class BaseTableDataManagerNeedRefreshTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"BaseTableDataManagerNeedRefreshTest");
+ private static final String DEFAULT_TABLE_NAME = "mytable";
+ private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(DEFAULT_TABLE_NAME);
+ private static final File TABLE_DATA_DIR = new File(TEMP_DIR,
OFFLINE_TABLE_NAME);
+
+ private static final String DEFAULT_TIME_COLUMN_NAME = "DaysSinceEpoch";
+ private static final String MS_SINCE_EPOCH_COLUMN_NAME =
"MilliSecondsSinceEpoch";
+ private static final String TEXT_INDEX_COLUMN = "textColumn";
+ private static final String TEXT_INDEX_COLUMN_MV = "textColumnMV";
+ private static final String PARTITIONED_COLUMN_NAME = "partitionedColumn";
+ private static final String DISTANCE_COLUMN_NAME = "Distance";
+ private static final String CARRIER_COLUMN_NAME = "Carrier";
+ private static final int NUM_PARTITIONS = 20; // For modulo function
+ private static final String PARTITION_FUNCTION_NAME = "MoDuLo";
+
+ private static final String JSON_INDEX_COLUMN = "jsonField";
+ private static final String FST_TEST_COLUMN = "DestCityName";
+ private static final String NULL_VALUE_COLUMN = "NullValueColumn";
+
+ private static final TableConfig TABLE_CONFIG;
+ private static final Schema SCHEMA;
+ private static final ImmutableSegmentDataManager
IMMUTABLE_SEGMENT_DATA_MANAGER;
+ private static final BaseTableDataManager BASE_TABLE_DATA_MANAGER;
+
+ private String _testName = "defaultTestName";
+
+ static {
+ try {
+ TABLE_CONFIG = getTableConfigBuilder().build();
+ SCHEMA = getSchema();
+ IMMUTABLE_SEGMENT_DATA_MANAGER =
+ createImmutableSegmentDataManager(TABLE_CONFIG, SCHEMA,
"basicSegment", generateRows());
+ BASE_TABLE_DATA_MANAGER = BaseTableDataManagerTest.createTableManager();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static TableConfigBuilder getTableConfigBuilder() {
+ return new
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME)
+
.setTimeColumnName(DEFAULT_TIME_COLUMN_NAME).setNullHandlingEnabled(true)
+ .setNoDictionaryColumns(List.of(TEXT_INDEX_COLUMN));
+ }
+
+ protected static Schema getSchema()
+ throws IOException {
+ return new Schema.SchemaBuilder().addDateTime(DEFAULT_TIME_COLUMN_NAME,
FieldSpec.DataType.INT, "1:DAYS:EPOCH",
+ "1:DAYS")
+ .addDateTime(MS_SINCE_EPOCH_COLUMN_NAME, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .addSingleValueDimension(PARTITIONED_COLUMN_NAME,
FieldSpec.DataType.INT)
+ .addSingleValueDimension(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING)
+ .addMultiValueDimension(TEXT_INDEX_COLUMN_MV,
FieldSpec.DataType.STRING)
+ .addSingleValueDimension(JSON_INDEX_COLUMN, FieldSpec.DataType.JSON)
+ .addSingleValueDimension(FST_TEST_COLUMN, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(NULL_VALUE_COLUMN, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(DISTANCE_COLUMN_NAME, FieldSpec.DataType.INT)
+ .addSingleValueDimension(CARRIER_COLUMN_NAME,
FieldSpec.DataType.STRING).build();
+ }
+
+ protected static List<GenericRow> generateRows() {
+ GenericRow row0 = new GenericRow();
+ row0.putValue(DEFAULT_TIME_COLUMN_NAME, 20000);
+ row0.putValue(MS_SINCE_EPOCH_COLUMN_NAME, 20000L * 86400 * 1000);
+ row0.putValue(TEXT_INDEX_COLUMN, "text_index_column_0");
+ row0.putValue(TEXT_INDEX_COLUMN_MV, "text_index_column_0");
+ row0.putValue(JSON_INDEX_COLUMN, "{\"a\":\"b\"}");
+ row0.putValue(FST_TEST_COLUMN, "fst_test_column_0");
+ row0.putValue(PARTITIONED_COLUMN_NAME, 0);
+ row0.putValue(DISTANCE_COLUMN_NAME, 1000);
+ row0.putValue(CARRIER_COLUMN_NAME, "c0");
+
+ GenericRow row1 = new GenericRow();
+ row1.putValue(DEFAULT_TIME_COLUMN_NAME, 20001);
+ row1.putValue(MS_SINCE_EPOCH_COLUMN_NAME, 20001L * 86400 * 1000);
+ row1.putValue(TEXT_INDEX_COLUMN, "text_index_column_0");
+ row1.putValue(TEXT_INDEX_COLUMN_MV, "text_index_column_1");
+ row1.putValue(JSON_INDEX_COLUMN, "{\"a\":\"b\"}");
+ row1.putValue(FST_TEST_COLUMN, "fst_test_column_1");
+ row1.putValue(PARTITIONED_COLUMN_NAME, 1);
+ row1.putValue(DISTANCE_COLUMN_NAME, 1000);
+ row1.putValue(CARRIER_COLUMN_NAME, "c1");
+
+ GenericRow row2 = new GenericRow();
+ row2.putValue(DEFAULT_TIME_COLUMN_NAME, 20002);
+ row2.putValue(MS_SINCE_EPOCH_COLUMN_NAME, 20002L * 86400 * 1000);
+ row2.putValue(TEXT_INDEX_COLUMN, "text_index_column_0");
+ row2.putValue(TEXT_INDEX_COLUMN_MV, "text_index_column_2");
+ row2.putValue(JSON_INDEX_COLUMN, "{\"a\":\"b\"}");
+ row2.putValue(FST_TEST_COLUMN, "fst_test_column_2");
+ row2.putValue(PARTITIONED_COLUMN_NAME, 2);
+ row2.putValue(DISTANCE_COLUMN_NAME, 2000);
+ row2.putValue(CARRIER_COLUMN_NAME, "c0");
+
+ return List.of(row0, row2, row1);
+ }
+
+ private static File createSegment(TableConfig tableConfig, Schema schema,
+ String segmentName, List<GenericRow> rows)
+ throws Exception {
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig,
schema);
+ config.setOutDir(TABLE_DATA_DIR.getAbsolutePath());
+ config.setSegmentName(segmentName);
+ config.setSegmentVersion(SegmentVersion.v3);
+
+ //Create ONE row
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(config, new GenericRowRecordReader(rows));
+ driver.build();
+ return new File(TABLE_DATA_DIR, segmentName);
+ }
+
+ private static ImmutableSegmentDataManager
createImmutableSegmentDataManager(TableConfig tableConfig, Schema schema,
+ String segmentName, List<GenericRow> rows)
+ throws Exception {
+ ImmutableSegmentDataManager segmentDataManager =
mock(ImmutableSegmentDataManager.class);
+ when(segmentDataManager.getSegmentName()).thenReturn(segmentName);
+ File indexDir = createSegment(tableConfig, schema, segmentName, rows);
+
+ IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(tableConfig, schema);
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir,
indexLoadingConfig);
+ when(segmentDataManager.getSegment()).thenReturn(immutableSegment);
+ return segmentDataManager;
+ }
+
+ @BeforeMethod
+ void setTestName(Method method) {
+ _testName = method.getName();
+ }
+
+ @Test
+ void testAddTimeColumn()
+ throws Exception {
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME).setNullHandlingEnabled(true)
+
.setNoDictionaryColumns(Collections.singletonList(TEXT_INDEX_COLUMN)).build();
+
+ Schema schema = new
Schema.SchemaBuilder().addSingleValueDimension(TEXT_INDEX_COLUMN,
FieldSpec.DataType.STRING)
+ .addSingleValueDimension(JSON_INDEX_COLUMN, FieldSpec.DataType.JSON)
+ .addSingleValueDimension(FST_TEST_COLUMN,
FieldSpec.DataType.STRING).build();
+
+ GenericRow row = new GenericRow();
+ row.putValue(TEXT_INDEX_COLUMN, "text_index_column");
+ row.putValue(JSON_INDEX_COLUMN, "{\"a\":\"b\"}");
+ row.putValue(FST_TEST_COLUMN, "fst_test_column");
+
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, schema, "noChanges",
List.of(row));
+ BaseTableDataManager tableDataManager =
BaseTableDataManagerTest.createTableManager();
+
+ StaleSegment response =
+ tableDataManager.isSegmentStale(tableConfig, schema,
segmentDataManager);
+ assertFalse(response.isStale());
+
+ // Test new time column
+ response =
tableDataManager.isSegmentStale(getTableConfigBuilder().build(), getSchema(),
segmentDataManager);
+ assertTrue(response.isStale());
+ assertEquals(response.getReason(), "time column");
+ }
+
+ @Test
+ void testChangeTimeColumn() {
+ StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(
+
getTableConfigBuilder().setTimeColumnName(MS_SINCE_EPOCH_COLUMN_NAME).build(),
SCHEMA,
+ IMMUTABLE_SEGMENT_DATA_MANAGER);
+ assertTrue(response.isStale());
+ assertEquals(response.getReason(), "time column");
+ }
+
+ @Test
+ void testRemoveColumn()
+ throws Exception {
+ Schema schema = getSchema();
+ schema.removeField(TEXT_INDEX_COLUMN);
+ StaleSegment response =
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ assertTrue(response.isStale());
+ assertEquals(response.getReason(), "column deleted: textColumn");
+ }
+
+ @Test
+ void testFieldType()
+ throws Exception {
+ Schema schema = getSchema();
+ schema.removeField(TEXT_INDEX_COLUMN);
+ schema.addField(new MetricFieldSpec(TEXT_INDEX_COLUMN,
FieldSpec.DataType.STRING, true));
+
+ StaleSegment response =
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ assertTrue(response.isStale());
+ assertEquals(response.getReason(), "field type changed: textColumn");
+ }
+
+ @Test
+ void testChangeDataType()
+ throws Exception {
+ Schema schema = getSchema();
+ schema.removeField(TEXT_INDEX_COLUMN);
+ schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN,
FieldSpec.DataType.INT, true));
+
+ StaleSegment response =
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ assertTrue(response.isStale());
+ assertEquals(response.getReason(), "data type changed: textColumn");
+ }
+
+ @Test
+ void testChangeToMV()
+ throws Exception {
+ Schema schema = getSchema();
+ schema.removeField(TEXT_INDEX_COLUMN);
+ schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN,
FieldSpec.DataType.STRING, false));
+
+ StaleSegment response =
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ assertTrue(response.isStale());
+ assertEquals(response.getReason(), "single / multi value changed:
textColumn");
+ }
+
+ @Test
+ void testChangeToSV()
+ throws Exception {
+ Schema schema = getSchema();
+ schema.removeField(TEXT_INDEX_COLUMN_MV);
+ schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN_MV,
FieldSpec.DataType.STRING, true));
+
+ StaleSegment response =
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ assertTrue(response.isStale());
+ assertEquals(response.getReason(), "single / multi value changed:
textColumnMV");
+ }
+
+ @Test
+ void testSortColumnMismatch() {
+ // Check with a column that is not sorted
+ StaleSegment response =
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(
+
getTableConfigBuilder().setSortedColumn(MS_SINCE_EPOCH_COLUMN_NAME).build(),
+ SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER);
+ assertTrue(response.isStale());
+ assertEquals(response.getReason(), "sort column changed:
MilliSecondsSinceEpoch");
+ // Check with a column that is sorted
+ assertFalse(
+
BASE_TABLE_DATA_MANAGER.isSegmentStale(getTableConfigBuilder().setSortedColumn(TEXT_INDEX_COLUMN).build(),
+ SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
+ }
+
+ @DataProvider(name = "testFilterArgs")
+ private Object[][] testFilterArgs() {
+ return new Object[][]{
+ {
+ "withBloomFilter", getTableConfigBuilder().setBloomFilterColumns(
+ List.of(TEXT_INDEX_COLUMN)).build(), "bloom filter changed:
textColumn"
+ }, {
+ "withJsonIndex", getTableConfigBuilder().setJsonIndexColumns(
+ List.of(JSON_INDEX_COLUMN)).build(), "json index changed: jsonField"
+ }, {
+ "withTextIndex", getTableConfigBuilder().setFieldConfigList(List.of(
+ new FieldConfig(TEXT_INDEX_COLUMN,
FieldConfig.EncodingType.DICTIONARY, List.of(FieldConfig.IndexType.TEXT),
+ null, null))).build(), "text index changed: textColumn"
+ }, {
+ "withFstIndex", getTableConfigBuilder().setFieldConfigList(List.of(
+ new FieldConfig(FST_TEST_COLUMN, FieldConfig.EncodingType.DICTIONARY,
List.of(FieldConfig.IndexType.FST),
+ null, Map.of(FieldConfig.TEXT_FST_TYPE,
FieldConfig.TEXT_NATIVE_FST_LITERAL)))).build(),
+ "fst index changed: DestCityName"
+ }, {
+ "withRangeFilter", getTableConfigBuilder().setRangeIndexColumns(
+ List.of(MS_SINCE_EPOCH_COLUMN_NAME)).build(), "range index changed:
MilliSecondsSinceEpoch"
+ }
+ };
+ }
+
+ @Test(dataProvider = "testFilterArgs")
+ void testFilter(String segmentName, TableConfig tableConfigWithFilter,
String expectedReason)
+ throws Exception {
+ ImmutableSegmentDataManager segmentWithFilter =
+ createImmutableSegmentDataManager(tableConfigWithFilter, SCHEMA,
segmentName, generateRows());
+
+ // When TableConfig has a filter but segment does not have, needRefresh is
true.
+ StaleSegment response =
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfigWithFilter, SCHEMA,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ assertTrue(response.isStale());
+ assertEquals(response.getReason(), expectedReason);
+
+ // When TableConfig does not have a filter but segment has, needRefresh is
true
+ response = BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA,
segmentWithFilter);
+ assertTrue(response.isStale());
+ assertEquals(response.getReason(), expectedReason);
+
+ // When TableConfig has a filter AND segment also has a filter,
needRefresh is false
+ assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfigWithFilter,
SCHEMA, segmentWithFilter).isStale());
+ }
+
+ @Test
+ void testPartition()
+ throws Exception {
+ TableConfig partitionedTableConfig =
getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig(
+ Map.of(PARTITIONED_COLUMN_NAME, new
ColumnPartitionConfig(PARTITION_FUNCTION_NAME, NUM_PARTITIONS)))).build();
+ ImmutableSegmentDataManager segmentWithPartition =
+ createImmutableSegmentDataManager(partitionedTableConfig, SCHEMA,
"partitionWithModulo", generateRows());
+
+ // when segment has no partition AND tableConfig has partitions then
needRefresh = true
+ StaleSegment response =
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfig, SCHEMA,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ assertTrue(response.isStale());
+ assertEquals(response.getReason(), "partition function added:
partitionedColumn");
+
+ // when segment has partitions AND tableConfig has no partitions, then
needRefresh = false
+ assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA,
segmentWithPartition).isStale());
+
+ // when # of partitions is different, then needRefresh = true
+ TableConfig partitionedTableConfig40 =
getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig(
+ Map.of(PARTITIONED_COLUMN_NAME, new
ColumnPartitionConfig(PARTITION_FUNCTION_NAME, 40)))).build();
+
+ response =
BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfig40, SCHEMA,
segmentWithPartition);
+ assertTrue(response.isStale());
+ assertEquals(response.getReason(), "num partitions changed:
partitionedColumn");
+
+ // when partition function is different, then needRefresh = true
+ TableConfig partitionedTableConfigMurmur =
getTableConfigBuilder().setSegmentPartitionConfig(
+ new SegmentPartitionConfig(
+ Map.of(PARTITIONED_COLUMN_NAME, new
ColumnPartitionConfig("murmur", NUM_PARTITIONS)))).build();
+
+ response =
BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfigMurmur, SCHEMA,
segmentWithPartition);
+ assertTrue(response.isStale());
+ assertEquals(response.getReason(), "partition function name changed:
partitionedColumn");
+ }
+
+ @Test
+ void testNullValueVector()
+ throws Exception {
+ TableConfig withoutNullHandling =
getTableConfigBuilder().setNullHandlingEnabled(false).build();
+ ImmutableSegmentDataManager segmentWithoutNullHandling =
+ createImmutableSegmentDataManager(withoutNullHandling, SCHEMA,
"withoutNullHandling", generateRows());
+
+ // If null handling is removed from table config AND segment has NVV, then
NVV can be removed. needRefresh = true
+ StaleSegment response =
+ BASE_TABLE_DATA_MANAGER.isSegmentStale(withoutNullHandling, SCHEMA,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ assertTrue(response.isStale());
+ assertEquals(response.getReason(), "null value vector index removed from
column: NullValueColumn");
+
+ // if NVV is added to table config AND segment does not have NVV, then it
cannot be added. needRefresh = false
+ assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA,
segmentWithoutNullHandling).isStale());
+ }
+
+ @Test
+ // Test 1 : Adding a StarTree index should trigger segment refresh.
+ public void addStartreeIndex()
+ throws Exception {
+ StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(getTableConfigBuilder().build(),
SCHEMA, _testName, generateRows());
+ assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ public void testStarTreeIndexWithDifferentColumn()
+ throws Exception {
+
+ // Test 2: Adding a new StarTree index with split dimension column of same
size but with different element should
+ // trigger segment refresh.
+
+ // Create a segment with StarTree index on Carrier.
+ StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ // Create a StarTree index on Distance.
+ StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Distance"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig newTableConfig =
+
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ public void testStarTreeIndexWithManyColumns()
+ throws Exception {
+
+ // Test 3: Adding a new StarTree index with split dimension columns of
different size should trigger segment
+ // refresh.
+
+ // Create a segment with StarTree index on Carrier.
+ StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig newTableConfig =
+
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ public void testStartIndexWithDifferentOrder()
+ throws Exception {
+
+ // Test 4: Adding a new StarTree index with the differently ordered split
dimension columns should trigger
+ // segment refresh.
+
+ // Create a segment with StarTree index on Carrier, Distance.
+ StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ // Create a StarTree index.
+ StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Distance", "Carrier"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ void testStarTreeIndexWithSkipDimCols()
+ throws Exception {
+
+ // Test 5: Adding a new StarTree index with skipped dimension columns
should trigger segment refresh.
+ // Create a segment with StarTree index on Carrier, Distance.
+ StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ // Create a StarTree index.
+ StarTreeIndexConfig newStarTreeIndexConfig =
+ new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"),
Arrays.asList("Carrier", "Distance"),
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ void testStarTreeIndexWithDiffOrderSkipDimCols()
+ throws Exception {
+ // Test 6: Adding a new StarTree index with skipped dimension columns in
different order should not trigger
+ // segment refresh.
+ StarTreeIndexConfig starTreeIndexConfig =
+ new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"),
Arrays.asList("Carrier", "Distance"),
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ StarTreeIndexConfig newStarTreeIndexConfig =
+ new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"),
Arrays.asList("Distance", "Carrier"),
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ void testStarTreeIndexRemoveSkipDimCols()
+ throws Exception {
+ // Test 7: Adding a new StarTree index with removed skipped-dimension
column should trigger segment refresh.
+ StarTreeIndexConfig starTreeIndexConfig =
+ new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"),
Arrays.asList("Carrier", "Distance"),
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ void testStarTreeIndexAddAggFn()
+ throws Exception {
+ // Test 8: Adding a new StarTree index with an added metrics aggregation
function should trigger segment refresh.
+
+ StarTreeIndexConfig starTreeIndex = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndex)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ StarTreeIndexConfig starTreeIndexAddAggFn = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+ Arrays.asList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName(),
"MAX__Distance"), null, 100);
+ TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexAddAggFn)).build();
+ assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ void testStarTreeIndexDiffOrderAggFn()
+ throws Exception {
+ // Test 9: Adding a new StarTree index with the same aggregation functions
but in different order should not
+ // trigger segment refresh.
+
+ StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+ Arrays.asList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName(),
"MAX__Distance"), null, 100);
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+ Arrays.asList("MAX__Distance",
AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100);
+ TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ void testStarTreeIndexRemoveAggFn()
+ throws Exception {
+ // Test 10: removing an aggregation function through aggregation config
should trigger segment refresh.
+
+ StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+ Arrays.asList("MAX__Distance",
AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100);
+
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ StarTreeIndexConfig newStarTreeIndexConfig =
+ new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
null,
+ List.of(new StarTreeAggregationConfig("Distance", "MAX")), 100);
+ TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ void testStarTreeIndexNewMetricAgg()
+ throws Exception {
+ // Test 11 : Adding a new metric aggregation function through
functionColumnPairs should trigger segment refresh.
+ StarTreeAggregationConfig aggregationConfig = new
StarTreeAggregationConfig("Distance", "MAX");
+ StarTreeIndexConfig starTreeIndexConfig =
+ new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
null, List.of(aggregationConfig), 100);
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ // Create a StarTree index.
+ StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
+ Collections.singletonList(aggregationConfig), 100);
+ TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ void testStarTreeIndexDiffOrderAggFn2()
+ throws Exception {
+ // Test 12: Adding a new StarTree index with different ordered aggregation
functions through aggregation config
+ // should not trigger segment refresh.
+
+ StarTreeAggregationConfig aggregationConfig = new
StarTreeAggregationConfig("Distance", "MAX");
+ StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
+ Collections.singletonList(aggregationConfig), 100);
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ StarTreeAggregationConfig starTreeAggregationConfig2 = new
StarTreeAggregationConfig("*", "count");
+ StarTreeIndexConfig newStarTreeIndexConfig =
+ new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
null,
+ Arrays.asList(starTreeAggregationConfig2, aggregationConfig), 100);
+ TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ void testStarTreeIndexMaxLeafNode()
+ throws Exception {
+ StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 10);
+ TableConfig newConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+ assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ void testStarTreeIndexRemove()
+ throws Exception {
+ StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+ assertTrue(
+
BASE_TABLE_DATA_MANAGER.isSegmentStale(getTableConfigBuilder().build(), SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ void testStarTreeIndexAddMultiple()
+ throws Exception {
+ // Test 15: Add multiple StarTree Indexes should trigger segment refresh.
+
+ StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ StarTreeIndexConfig newStarTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Distance"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig newTableConfig =
+
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig,
newStarTreeIndexConfig)).build();
+ assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ void testStarTreeIndexEnableDefault()
+ throws Exception {
+ // Test 16: Enabling default StarTree index should trigger a segment
refresh.
+
+ StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ newTableConfig.getIndexingConfig().setEnableDefaultStarTree(true);
+ assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ void testStarTreeIndexNoChanges()
+ throws Exception {
+ // Test 17: Attempting to trigger segment refresh again should not be
successful.
+
+ StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+
+ @Test
+ void testStarTreeIndexDisableDefault()
+ throws Exception {
+ // Test 18: Disabling default StarTree index should trigger a segment
refresh.
+
+ StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
null, 100);
+ TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ tableConfig.getIndexingConfig().setEnableDefaultStarTree(true);
+ ImmutableSegmentDataManager segmentDataManager =
+ createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName,
generateRows());
+
+ TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+ newTableConfig.getIndexingConfig().setEnableDefaultStarTree(false);
+ assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA,
segmentDataManager).isStale());
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 69a8d88fd6..1d17315aa7 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -648,7 +648,7 @@ public class BaseTableDataManagerTest {
}
}
- private static OfflineTableDataManager createTableManager() {
+ static OfflineTableDataManager createTableManager() {
return createTableManager(createDefaultInstanceDataManagerConfig());
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java
new file mode 100644
index 0000000000..c2fbd83cb9
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse;
+import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class StaleSegmentCheckIntegrationTest extends
BaseClusterIntegrationTest {
+ private static final String JSON_FIELD = "jsonField";
+
+ private PinotTaskManager _taskManager;
+ private PinotHelixTaskResourceManager _taskResourceManager;
+ private TableConfig _tableConfig;
+ private Schema _schema;
+ private List<File> _avroFiles;
+ private static final String H3_INDEX_COLUMN = "h3Column";
+ private static final Map<String, String> H3_INDEX_PROPERTIES =
Collections.singletonMap("resolutions", "5");
+ private static final String TEXT_INDEX_COLUMN = "textColumn";
+ private static final String NULL_INDEX_COLUMN = "nullField";
+
+ private static final String JSON_INDEX_COLUMN = "jsonField";
+ private static final String FST_TEST_COLUMN = "DestCityName";
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBroker();
+ startServer();
+ startMinion();
+ // Start Kafka
+ startKafka();
+
+ _taskManager = _controllerStarter.getTaskManager();
+ _taskResourceManager = _controllerStarter.getHelixTaskResourceManager();
+ _avroFiles = unpackAvroData(_tempDir);
+
+ // Create and upload the schema and table config
+ _schema = createSchema();
+ _schema.addField(new DimensionFieldSpec(JSON_FIELD,
FieldSpec.DataType.STRING, true));
+ _schema.addField(new DimensionFieldSpec(NULL_INDEX_COLUMN,
FieldSpec.DataType.STRING, true));
+ _schema.addField(new DimensionFieldSpec(H3_INDEX_COLUMN,
FieldSpec.DataType.BYTES, true));
+ _schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN,
FieldSpec.DataType.STRING, true));
+
+ addSchema(_schema);
+
+ _tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setTimeColumnName(getTimeColumnName())
+
.setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(true)
+
.setNoDictionaryColumns(Collections.singletonList(TEXT_INDEX_COLUMN)).build();
+ addTableConfig(_tableConfig);
+
+ // Create and upload segments
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(_avroFiles,
_tableConfig, _schema, 0, _segmentDir, _tarDir);
+ uploadSegments(getTableName(), _tarDir);
+
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(300_000L);
+ }
+
+ private FieldConfig getH3FieldConfig() {
+ return new FieldConfig(H3_INDEX_COLUMN,
FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.H3, null,
+ H3_INDEX_PROPERTIES);
+ }
+
+ private FieldConfig getTextFieldConfig() {
+ return new FieldConfig(TEXT_INDEX_COLUMN, FieldConfig.EncodingType.RAW,
FieldConfig.IndexType.TEXT, null, null);
+ }
+
+ private FieldConfig getFstFieldConfig() {
+ Map<String, String> propertiesMap = new HashMap<>();
+ propertiesMap.put(FieldConfig.TEXT_FST_TYPE,
FieldConfig.TEXT_NATIVE_FST_LITERAL);
+ return new FieldConfig(FST_TEST_COLUMN, FieldConfig.EncodingType.RAW,
FieldConfig.IndexType.TEXT, null,
+ propertiesMap);
+ }
+
+ @Override
+ protected IngestionConfig getIngestionConfig() {
+ List<TransformConfig> transforms = new ArrayList<>();
+ transforms.add(new TransformConfig(JSON_INDEX_COLUMN,
+
"Groovy({'{\"DestState\":\"'+DestState+'\",\"OriginState\":\"'+OriginState+'\"}'},
DestState, OriginState)"));
+ transforms.add(new TransformConfig(NULL_INDEX_COLUMN, "Groovy({null})"));
+ // This is the byte encoding of ST_POINT(-122, 37)
+ transforms.add(new TransformConfig(H3_INDEX_COLUMN,
+
"Groovy({[0x00,0xc0,0x5e,0x80,0x00,0x00,0x00,0x00,0x00,0x40,0x42,0x80,0x00,0x00,0x00,0x00,0x00]
as byte[]})"));
+ transforms.add(new TransformConfig(TEXT_INDEX_COLUMN, "Groovy({\"Hello
this is a text column\"})"));
+
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(transforms);
+
+ return ingestionConfig;
+ }
+
+ @Test
+ public void testAddRemoveSortedIndex()
+ throws Exception {
+ // Add a sorted column to the table
+ IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
+ indexingConfig.setSortedColumn(Collections.singletonList("Carrier"));
+ updateTableConfig(_tableConfig);
+
+ Map<String, TableStaleSegmentResponse> needRefreshResponses =
getStaleSegmentsResponse();
+ assertEquals(needRefreshResponses.size(), 1);
+
assertEquals(needRefreshResponses.values().iterator().next().getStaleSegmentList().size(),
12);
+ }
+
+ @Test(dependsOnMethods = "testAddRemoveSortedIndex")
+ public void testAddRemoveRawIndex()
+ throws Exception {
+ // Add a raw index column
+ IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
+
indexingConfig.setNoDictionaryColumns(Collections.singletonList("ActualElapsedTime"));
+ updateTableConfig(_tableConfig);
+
+ Map<String, TableStaleSegmentResponse> needRefreshResponses =
getStaleSegmentsResponse();
+ assertEquals(needRefreshResponses.size(), 1);
+
assertEquals(needRefreshResponses.values().iterator().next().getStaleSegmentList().size(),
12);
+ }
+
+ @Test(dependsOnMethods = "testAddRemoveSortedIndex")
+ public void testH3IndexChange()
+ throws Exception {
+ // Add a H3 index column
+
_tableConfig.setFieldConfigList(Collections.singletonList(getH3FieldConfig()));
+ updateTableConfig(_tableConfig);
+
+ Map<String, TableStaleSegmentResponse> needRefreshResponses =
getStaleSegmentsResponse();
+ assertEquals(needRefreshResponses.size(), 1);
+
assertEquals(needRefreshResponses.values().iterator().next().getStaleSegmentList().size(),
12);
+ }
+
+ private Map<String, TableStaleSegmentResponse> getStaleSegmentsResponse()
+ throws IOException {
+ return JsonUtils.stringToObject(sendGetRequest(
+ _controllerRequestURLBuilder.forStaleSegments(
+ TableNameBuilder.OFFLINE.tableNameWithType(getTableName()))),
+ new TypeReference<Map<String, TableStaleSegmentResponse>>() { });
+ }
+
+ @AfterClass
+ public void tearDown() {
+ try {
+ stopMinion();
+ stopServer();
+ stopBroker();
+ stopController();
+ stopZk();
+ } finally {
+ FileUtils.deleteQuietly(_tempDir);
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/StaleSegment.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/StaleSegment.java
new file mode 100644
index 0000000000..3e67093f1d
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/StaleSegment.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.data.manager;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * Encapsulates information for a stale segment. It captures segment name,
staleness and reason if it is stale.
+ */
+public class StaleSegment {
+ private final String _segmentName;
+ private final boolean _isStale;
+ private final String _reason;
+
+ @JsonCreator
+ public StaleSegment(@JsonProperty("segmentName") String segmentName,
@JsonProperty("reason") String reason) {
+ _segmentName = segmentName;
+ _isStale = true;
+ _reason = reason;
+ }
+
+ public StaleSegment(String segmentName, boolean isStale, String reason) {
+ _segmentName = segmentName;
+ _isStale = isStale;
+ _reason = reason;
+ }
+
+ @JsonProperty
+ public String getSegmentName() {
+ return _segmentName;
+ }
+
+ @JsonIgnore
+ public boolean isStale() {
+ return _isStale;
+ }
+
+ @JsonProperty
+ public String getReason() {
+ return _reason;
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 092701bdef..cf7e623269 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -323,4 +323,12 @@ public interface TableDataManager {
*/
default void onConsumingToOnline(String segmentNameStr) {
}
+
+ /**
+ * Return list of segment names that are stale along with reason.
+ * @param tableConfig Table Config of the table
+ * @param schema Schema of the table
+ * @return List of {@link StaleSegment} with segment names and reason why it
is stale
+ */
+ List<StaleSegment> getStaleSegments(TableConfig tableConfig, Schema schema);
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index b393ac050e..0506396e7e 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -84,6 +84,7 @@ import
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.StaleSegment;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.spi.ColumnMetadata;
@@ -96,9 +97,11 @@ import
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.server.access.AccessControlFactory;
import org.apache.pinot.server.api.AdminApiApplication;
import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -1082,4 +1085,26 @@ public class TablesResource {
return ResourceUtils.convertToJsonString(
new ServerSegmentsReloadCheckResponse(needReload,
tableDataManager.getInstanceId()));
}
+
+ @GET
+ @Path("/tables/{tableName}/segments/isStale")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get the list of segments that are stale or deviated
from table config.",
+ notes = "Get the list of segments that are stale or deviated from table
config")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500,
+ message = "Internal Server error", response = ErrorInfo.class)
+ })
+ public List<StaleSegment> getStaleSegments(
+ @ApiParam(value = "Table Name with type", required = true)
@PathParam("tableName") String tableName,
+ @Context HttpHeaders headers) {
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ TableDataManager tableDataManager =
ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName);
+ try {
+ Pair<TableConfig, Schema> tableConfigSchemaPair =
tableDataManager.fetchTableConfigAndSchema();
+ return
tableDataManager.getStaleSegments(tableConfigSchemaPair.getLeft(),
tableConfigSchemaPair.getRight());
+ } catch (Exception e) {
+ throw new WebApplicationException(e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 335f251995..da83dc2194 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -244,6 +244,10 @@ public class ControllerRequestURLBuilder {
return StringUtil.join("/", _baseUrl, "segments", tableNameWithType,
query);
}
+ public String forStaleSegments(String tableNameWithType) {
+ return StringUtil.join("/", _baseUrl, "segments", tableNameWithType,
"isStale");
+ }
+
public String forTableRebalanceStatus(String jobId) {
return StringUtil.join("/", _baseUrl, "rebalanceStatus", jobId);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]