This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ae97265d86 #12117 Support for Server & Controller API to check for
Segments reload of a table in servers (#13789)
ae97265d86 is described below
commit ae97265d863ce1dd4c67a85bc8da0120e35fa144
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Wed Aug 28 12:07:28 2024 -0700
#12117 Support for Server & Controller API to check for Segments reload of
a table in servers (#13789)
---
.../ServerSegmentsReloadCheckResponse.java | 53 +++++++++++++
.../TableSegmentsReloadCheckResponse.java | 55 +++++++++++++
.../utils/SegmentsReloadCheckResponseTest.java | 90 ++++++++++++++++++++++
.../api/resources/PinotSegmentRestletResource.java | 43 +++++++++++
.../controller/helix/ControllerRequestClient.java | 11 +++
.../util/ServerSegmentMetadataReader.java | 42 ++++++++++
.../pinot/controller/util/TableMetadataReader.java | 22 ++++++
.../pinot/controller/helix/ControllerTest.java | 5 ++
.../core/data/manager/BaseTableDataManager.java | 26 +++++++
.../tests/BaseClusterIntegrationTestSet.java | 48 +++++++++++-
.../local/data/manager/TableDataManager.java | 7 ++
.../immutable/ImmutableSegmentImpl.java | 9 +++
.../pinot/server/api/resources/TablesResource.java | 25 ++++++
.../utils/builder/ControllerRequestURLBuilder.java | 5 ++
14 files changed, 439 insertions(+), 2 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ServerSegmentsReloadCheckResponse.java
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ServerSegmentsReloadCheckResponse.java
new file mode 100644
index 0000000000..2469bda404
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ServerSegmentsReloadCheckResponse.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * This class gives the data of a server if there exists any segments that
need to be reloaded
+ *
+ * It has details of server id and returns true/false if there are any
segments to be reloaded or not.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ServerSegmentsReloadCheckResponse {
+ @JsonProperty("needReload")
+ private final boolean _needReload;
+
+ @JsonProperty("instanceId")
+ private final String _instanceId;
+
+ public boolean isNeedReload() {
+ return _needReload;
+ }
+
+ public String getInstanceId() {
+ return _instanceId;
+ }
+
+ @JsonCreator
+ public ServerSegmentsReloadCheckResponse(@JsonProperty("needReload") boolean
needReload,
+ @JsonProperty("instanceId") String instanceId) {
+ _needReload = needReload;
+ _instanceId = instanceId;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableSegmentsReloadCheckResponse.java
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableSegmentsReloadCheckResponse.java
new file mode 100644
index 0000000000..bd201870a4
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableSegmentsReloadCheckResponse.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+
+
+/**
+ * This class gives list of the details from each server if there exists any
segments that need to be reloaded
+ *
+ * It has details of reload flag which returns true if reload is needed on
table and additional details of the
+ * respective servers.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TableSegmentsReloadCheckResponse {
+ @JsonProperty("needReload")
+ boolean _needReload;
+ @JsonProperty("serverToSegmentsCheckReloadList")
+ Map<String, ServerSegmentsReloadCheckResponse>
_serverToSegmentsCheckReloadList;
+
+ public boolean isNeedReload() {
+ return _needReload;
+ }
+
+ public Map<String, ServerSegmentsReloadCheckResponse>
getServerToSegmentsCheckReloadList() {
+ return _serverToSegmentsCheckReloadList;
+ }
+
+ @JsonCreator
+ public TableSegmentsReloadCheckResponse(@JsonProperty("needReload") boolean
needReload,
+ @JsonProperty("serverToSegmentsCheckReloadList")
+ Map<String, ServerSegmentsReloadCheckResponse>
serverToSegmentsCheckReloadList) {
+ _needReload = needReload;
+ _serverToSegmentsCheckReloadList = serverToSegmentsCheckReloadList;
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentsReloadCheckResponseTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentsReloadCheckResponseTest.java
new file mode 100644
index 0000000000..f63a607a12
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentsReloadCheckResponseTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import
org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckResponse;
+import
org.apache.pinot.common.restlet.resources.TableSegmentsReloadCheckResponse;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Tests some of the serializer and deserialization responses from
SegmentsReloadCheckResponse class
+ * needReload will have to be carefully evaluated
+ */
+public class SegmentsReloadCheckResponseTest {
+
+ @Test
+ public void testSerialization()
+ throws IOException {
+ // Given
+ boolean needReload = true;
+ String instanceId = "instance123";
+ ServerSegmentsReloadCheckResponse response = new
ServerSegmentsReloadCheckResponse(needReload, instanceId);
+ Map<String, ServerSegmentsReloadCheckResponse> serversResponse = new
HashMap<>();
+ serversResponse.put(instanceId, response);
+ TableSegmentsReloadCheckResponse tableResponse = new
TableSegmentsReloadCheckResponse(needReload, serversResponse);
+ String responseString = JsonUtils.objectToPrettyString(response);
+ String tableResponseString = JsonUtils.objectToPrettyString(tableResponse);
+
+ assertNotNull(responseString);
+ assertNotNull(tableResponseString);
+ JsonNode tableResponseJsonNode =
JsonUtils.stringToJsonNode(tableResponseString);
+ assertTrue(tableResponseJsonNode.get("needReload").asBoolean());
+
+ JsonNode serversList =
tableResponseJsonNode.get("serverToSegmentsCheckReloadList");
+ JsonNode serverResp = serversList.get("instance123");
+ assertEquals(serverResp.get("instanceId").asText(), "instance123");
+ assertTrue(serverResp.get("needReload").asBoolean());
+
+ assertEquals("{\n" + " \"needReload\" : true,\n" + "
\"serverToSegmentsCheckReloadList\" : {\n"
+ + " \"instance123\" : {\n" + " \"needReload\" : true,\n" + "
\"instanceId\" : \"instance123\"\n"
+ + " }\n" + " }\n" + "}", tableResponseString);
+ assertEquals("{\n" + " \"needReload\" : true,\n" + " \"instanceId\" :
\"instance123\"\n" + "}", responseString);
+ }
+
+ @Test
+ public void testDeserialization()
+ throws Exception {
+ String jsonResponse = "{\n" + " \"needReload\": false,\n" + "
\"serverToSegmentsCheckReloadList\": {\n"
+ + " \"Server_10.0.0.215_7050\": {\n" + " \"needReload\":
false,\n"
+ + " \"instanceId\": \"Server_10.0.0.215_7050\"\n" + " }\n" + "
}\n" + "}";
+ JsonNode jsonNode = JsonUtils.stringToJsonNode(jsonResponse);
+ TableSegmentsReloadCheckResponse tableReloadResponse =
+ JsonUtils.stringToObject(jsonResponse, new
TypeReference<TableSegmentsReloadCheckResponse>() {
+ });
+ // Then
+ assertNotNull(jsonNode);
+ assertFalse(tableReloadResponse.isNeedReload());
+ assertNotNull(tableReloadResponse.getServerToSegmentsCheckReloadList());
+ Map<String, ServerSegmentsReloadCheckResponse> serverSegmentReloadResp =
+ tableReloadResponse.getServerToSegmentsCheckReloadList();
+
assertEquals(serverSegmentReloadResp.get("Server_10.0.0.215_7050").isNeedReload(),
false);
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index c75469d68c..74c09d8da7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -69,6 +69,8 @@ import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import
org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckResponse;
+import
org.apache.pinot.common.restlet.resources.TableSegmentsReloadCheckResponse;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
@@ -822,6 +824,47 @@ public class PinotSegmentRestletResource {
return segmentsMetadata;
}
+ @GET
+ @Path("segments/{tableNameWithType}/needReload")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType",
action = Actions.Table.GET_METADATA)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Gets the metadata of reload segments check from
servers hosting the table", notes =
+ "Returns true if reload is needed on the table from any one of the
servers")
+ public String getTableReloadMetadata(
+ @ApiParam(value = "Table name with type", required = true, example =
"myTable_REALTIME")
+ @PathParam("tableNameWithType") String tableNameWithType,
+ @QueryParam("verbose") @DefaultValue("false") boolean verbose, @Context
HttpHeaders headers) {
+ tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
+ LOGGER.info("Received a request to check reload for all servers hosting
segments for table {}", tableNameWithType);
+ try {
+ TableMetadataReader tableMetadataReader =
+ new TableMetadataReader(_executor, _connectionManager,
_pinotHelixResourceManager);
+ Map<String, JsonNode> needReloadMetadata =
+
tableMetadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType,
+ _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+ boolean needReload =
+ needReloadMetadata.values().stream().anyMatch(value ->
value.get("needReload").booleanValue());
+ Map<String, ServerSegmentsReloadCheckResponse> serverResponses = new
HashMap<>();
+ TableSegmentsReloadCheckResponse tableNeedReloadResponse;
+ if (verbose) {
+ for (Map.Entry<String, JsonNode> entry :
needReloadMetadata.entrySet()) {
+ serverResponses.put(entry.getKey(),
+ new
ServerSegmentsReloadCheckResponse(entry.getValue().get("needReload").booleanValue(),
+ entry.getValue().get("instanceId").asText()));
+ }
+ tableNeedReloadResponse = new
TableSegmentsReloadCheckResponse(needReload, serverResponses);
+ } else {
+ tableNeedReloadResponse = new
TableSegmentsReloadCheckResponse(needReload, serverResponses);
+ }
+ return JsonUtils.objectToPrettyString(tableNeedReloadResponse);
+ } catch (InvalidConfigException e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Status.BAD_REQUEST);
+ } catch (IOException ioe) {
+ throw new ControllerApplicationException(LOGGER, "Error parsing Pinot
server response: " + ioe.getMessage(),
+ Status.INTERNAL_SERVER_ERROR, ioe);
+ }
+ }
+
@GET
@Path("segments/{tableName}/zkmetadata")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_METADATA)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index 18676d581d..5f8f7d3190 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -201,6 +201,17 @@ public class ControllerRequestClient {
}
}
+ public String checkIfReloadIsNeeded(String tableNameWithType, Boolean
verbose)
+ throws IOException {
+ try {
+ SimpleHttpResponse simpleHttpResponse =
HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(
+ new
URI(_controllerRequestURLBuilder.forTableNeedReload(tableNameWithType,
verbose)), _headers, null));
+ return simpleHttpResponse.getResponse();
+ } catch (HttpErrorStatusException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
public void reloadSegment(String tableName, String segmentName, boolean
forceReload)
throws IOException {
try {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index b3fd851ff4..781140a978 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -214,6 +214,43 @@ public class ServerSegmentMetadataReader {
return segmentsMetadata;
}
+ /**
+ * This method is called when the API request is to fetch data about segment
reload of the table.
+ * This method makes a MultiGet call to all servers that host their
respective segments and gets the results.
+ * This method will return metadata of all the servers along with need
reload flag.
+ * In future additional details like segments list can also be added
+ */
+ public List<String> getCheckReloadSegmentsFromServer(String
tableNameWithType, Set<String> serverInstances,
+ BiMap<String, String> endpoints, int timeoutMs) {
+ LOGGER.debug("Checking if reload is needed on segments from servers for
table {}.", tableNameWithType);
+ List<String> serverURLs = new ArrayList<>();
+ for (String serverInstance : serverInstances) {
+ serverURLs.add(generateCheckReloadSegmentsServerURL(tableNameWithType,
endpoints.get(serverInstance)));
+ }
+ BiMap<String, String> endpointsToServers = endpoints.inverse();
+ CompletionServiceHelper completionServiceHelper =
+ new CompletionServiceHelper(_executor, _connectionManager,
endpointsToServers);
+ CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ completionServiceHelper.doMultiGetRequest(serverURLs,
tableNameWithType, true, timeoutMs);
+ List<String> serversNeedReloadResponses = new ArrayList<>();
+
+ int failedParses = 0;
+ for (Map.Entry<String, String> streamResponse :
serviceResponse._httpResponses.entrySet()) {
+ try {
+ serversNeedReloadResponses.add(streamResponse.getValue());
+ } catch (Exception e) {
+ failedParses++;
+ LOGGER.error("Unable to parse server {} response due to an error: ",
streamResponse.getKey(), e);
+ }
+ }
+ if (failedParses != 0) {
+ LOGGER.error("Unable to parse server {} / {} response due to an error:
", failedParses, serverURLs.size());
+ }
+
+ LOGGER.debug("Retrieved metadata of reload check from servers.");
+ return serversNeedReloadResponses;
+ }
+
/**
* This method is called when the API request is to fetch validDocId
metadata for a list segments of the given table.
* This method will pick one server randomly that hosts the target segment
and fetch the segment metadata result.
@@ -375,6 +412,11 @@ public class ServerSegmentMetadataReader {
return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint,
tableNameWithType, segmentName, paramsStr);
}
+ private String generateCheckReloadSegmentsServerURL(String
tableNameWithType, String endpoint) {
+ tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
+ return String.format("%s/tables/%s/segments/needReload", endpoint,
tableNameWithType);
+ }
+
@Deprecated
private String generateValidDocIdsURL(String tableNameWithType, String
segmentName, String validDocIdsType,
String endpoint) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
index adf7e3a7b7..a7a53d421d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
@@ -23,6 +23,7 @@ import com.google.common.collect.BiMap;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -33,7 +34,9 @@ import
org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
/**
@@ -55,6 +58,25 @@ public class TableMetadataReader {
_pinotHelixResourceManager = helixResourceManager;
}
+ public Map<String, JsonNode> getServerCheckSegmentsReloadMetadata(String
tableNameWithType, int timeoutMs)
+ throws InvalidConfigException, IOException {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ List<String> serverInstances =
_pinotHelixResourceManager.getServerInstancesForTable(tableNameWithType,
tableType);
+ Set<String> serverInstanceSet = new HashSet<>(serverInstances);
+ BiMap<String, String> endpoints =
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverInstanceSet);
+ ServerSegmentMetadataReader serverSegmentMetadataReader =
+ new ServerSegmentMetadataReader(_executor, _connectionManager);
+ List<String> segmentsMetadata =
+
serverSegmentMetadataReader.getCheckReloadSegmentsFromServer(tableNameWithType,
serverInstanceSet, endpoints,
+ timeoutMs);
+ Map<String, JsonNode> response = new HashMap<>();
+ for (String segmentMetadata : segmentsMetadata) {
+ JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata);
+ response.put(responseJson.get("instanceId").asText(), responseJson);
+ }
+ return response;
+ }
+
/**
* This api takes in list of segments for which we need the metadata.
*/
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 466de32d25..98ddbac73a 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -721,6 +721,11 @@ public class ControllerTest {
return getControllerRequestClient().reloadTable(tableName,
TableType.OFFLINE, forceDownload);
}
+ public String checkIfReloadIsNeeded(String tableNameWithType, Boolean
verbose)
+ throws IOException {
+ return
getControllerRequestClient().checkIfReloadIsNeeded(tableNameWithType, verbose);
+ }
+
public void reloadOfflineSegment(String tableName, String segmentName,
boolean forceDownload)
throws IOException {
getControllerRequestClient().reloadSegment(tableName, segmentName,
forceDownload);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 4f24dc3d52..56d2cb35d6 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -61,6 +61,7 @@ import
org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
@@ -1024,6 +1025,31 @@ public abstract class BaseTableDataManager implements
TableDataManager {
}
}
+ @Override
+ public boolean needReloadSegments()
+ throws Exception {
+ IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
+ List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
+ boolean needReload = false;
+ try {
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ IndexSegment segment = segmentDataManager.getSegment();
+ if (segment instanceof ImmutableSegmentImpl) {
+ ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl)
segment;
+ if (immutableSegment.isReloadNeeded(indexLoadingConfig)) {
+ needReload = true;
+ break;
+ }
+ }
+ }
+ } finally {
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ releaseSegment(segmentDataManager);
+ }
+ }
+ return needReload;
+ }
+
private SegmentDirectory initSegmentDirectory(String segmentName, String
segmentCrc,
IndexLoadingConfig indexLoadingConfig)
throws Exception {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index 09310cb243..ca27ed4ef4 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -732,11 +732,34 @@ public abstract class BaseClusterIntegrationTestSet
extends BaseClusterIntegrati
// Upload the schema with extra columns
addSchema(schema);
-
+ String tableNameWithTypeOffline =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName);
+ String tableNameWithTypeRealtime =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName);
// Reload the table
if (includeOfflineTable) {
+ //test controller api which gives responses if reload is needed on any
of the server segments when default
+ // columns are added
+ String needBeforeReloadResponseWithNoVerbose =
checkIfReloadIsNeeded(tableNameWithTypeOffline, false);
+ String needBeforeReloadResponseWithVerbose =
checkIfReloadIsNeeded(tableNameWithTypeOffline, true);
+ JsonNode jsonNeedReloadResponseWithNoVerbose =
JsonUtils.stringToJsonNode(needBeforeReloadResponseWithNoVerbose);
+ JsonNode jsonNeedReloadResponseWithVerbose =
JsonUtils.stringToJsonNode(needBeforeReloadResponseWithVerbose);
+ //test to check if reload is needed i.e true
+
assertTrue(jsonNeedReloadResponseWithNoVerbose.get("needReload").asBoolean());
+
assertTrue(jsonNeedReloadResponseWithVerbose.get("needReload").asBoolean());
+
assertFalse(jsonNeedReloadResponseWithVerbose.get("serverToSegmentsCheckReloadList").isEmpty());
reloadOfflineTable(rawTableName);
}
+ //test controller api which gives responses if reload is needed on any of
the server segments when default
+ // columns are added
+ String needBeforeReloadResponseRealtimeWithNoVerbose =
checkIfReloadIsNeeded(tableNameWithTypeRealtime, false);
+ String needBeforeReloadResponseRealtimeWithVerbose =
checkIfReloadIsNeeded(tableNameWithTypeRealtime, true);
+ JsonNode jsonNeedReloadResponseRealTimeWithNoVerbose =
+
JsonUtils.stringToJsonNode(needBeforeReloadResponseRealtimeWithNoVerbose);
+ JsonNode jsonNeedReloadResponseRealTimeWithVerbose =
+
JsonUtils.stringToJsonNode(needBeforeReloadResponseRealtimeWithVerbose);
+ //test to check if reload is needed i.e true
+
assertTrue(jsonNeedReloadResponseRealTimeWithNoVerbose.get("needReload").asBoolean());
+
assertTrue(jsonNeedReloadResponseRealTimeWithVerbose.get("needReload").asBoolean());
+
assertFalse(jsonNeedReloadResponseRealTimeWithVerbose.get("serverToSegmentsCheckReloadList").isEmpty());
reloadRealtimeTable(rawTableName);
// Wait for all segments to finish reloading, and test querying the new
columns
@@ -762,7 +785,6 @@ public abstract class BaseClusterIntegrationTestSet extends
BaseClusterIntegrati
JsonNode resultTable = queryResponse.get("resultTable");
assertEquals(resultTable.get("dataSchema").get("columnNames").size(),
schema.size());
assertEquals(resultTable.get("rows").size(), 10);
-
// Test aggregation query to include querying all segemnts (including
realtime)
String aggregationQuery = "SELECT SUMMV(NewIntMVDimension) FROM " +
rawTableName;
queryResponse = postQuery(aggregationQuery);
@@ -778,6 +800,28 @@ public abstract class BaseClusterIntegrationTestSet
extends BaseClusterIntegrati
queryResponse = postQuery(countStarQuery);
assertEquals(queryResponse.get("exceptions").size(), 0);
assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asLong(),
countStarResult);
+ if (includeOfflineTable) {
+ String needAfterReloadResponseWithNoVerbose =
checkIfReloadIsNeeded(tableNameWithTypeOffline, false);
+ String needAfterReloadResponseWithVerbose =
checkIfReloadIsNeeded(tableNameWithTypeOffline, true);
+ JsonNode jsonNeedReloadResponseAfterWithNoVerbose =
+ JsonUtils.stringToJsonNode(needAfterReloadResponseWithNoVerbose);
+ JsonNode jsonNeedReloadResponseAfterWithVerbose =
JsonUtils.stringToJsonNode(needAfterReloadResponseWithVerbose);
+ //test to check if reload on offline table is needed i.e false after
reload is finished
+
assertFalse(jsonNeedReloadResponseAfterWithNoVerbose.get("needReload").asBoolean());
+
assertFalse(jsonNeedReloadResponseAfterWithVerbose.get("needReload").asBoolean());
+
assertFalse(jsonNeedReloadResponseRealTimeWithVerbose.get("serverToSegmentsCheckReloadList").isEmpty());
+ }
+ String needAfterReloadResponseRealtimeWithNoVerbose =
checkIfReloadIsNeeded(tableNameWithTypeRealtime, false);
+ String needAfterReloadResponseRealTimeWithVerbose =
checkIfReloadIsNeeded(tableNameWithTypeRealtime, true);
+ JsonNode jsonNeedReloadResponseRealtimeAfterWithNoVerbose =
+
JsonUtils.stringToJsonNode(needAfterReloadResponseRealtimeWithNoVerbose);
+ JsonNode jsonNeedReloadResponseRealtimeAfterWithVerbose =
+ JsonUtils.stringToJsonNode(needAfterReloadResponseRealTimeWithVerbose);
+
+ //test to check if reload on real time table is needed i.e false after
reload is finished
+
assertFalse(jsonNeedReloadResponseRealtimeAfterWithNoVerbose.get("needReload").asBoolean());
+
assertFalse(jsonNeedReloadResponseRealtimeAfterWithVerbose.get("needReload").asBoolean());
+
assertFalse(jsonNeedReloadResponseRealtimeAfterWithVerbose.get("serverToSegmentsCheckReloadList").isEmpty());
}
private DimensionFieldSpec constructNewDimension(FieldSpec.DataType
dataType, boolean singleValue) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 677d659fff..480b2ba70b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -122,6 +122,13 @@ public interface TableDataManager {
*/
boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata,
IndexLoadingConfig indexLoadingConfig);
+ /**
+ * Check if reload is needed for any of the segments of a table
+ * @return true if reload is needed for any of the segments and false
otherwise
+ */
+ boolean needReloadSegments()
+ throws Exception;
+
/**
* Downloads a segment and loads it into the table.
* NOTE: This method is part of the implementation detail of {@link
#addOnlineSegment(String)}.
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index 76129dabb8..14546d7ba6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -33,6 +33,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexContainer;
@@ -173,6 +174,14 @@ public class ImmutableSegmentImpl implements
ImmutableSegment {
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
}
+ /**
+ * if re processing or reload is needed on a segment then return true
+ */
+ public boolean isReloadNeeded(IndexLoadingConfig indexLoadingConfig)
+ throws Exception {
+ return ImmutableSegmentLoader.needPreprocess(_segmentDirectory,
indexLoadingConfig, indexLoadingConfig.getSchema());
+ }
+
@Override
public <I extends IndexReader> I getIndex(String column, IndexType<?, I, ?>
type) {
ColumnIndexContainer container = _indexContainerMap.get(column);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 228ba2277b..ce85ec3f31 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -64,6 +64,7 @@ import
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.response.server.TableIndexMetadataResponse;
import org.apache.pinot.common.restlet.resources.ResourceUtils;
import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
+import
org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckResponse;
import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
import org.apache.pinot.common.restlet.resources.TableSegmentValidationInfo;
import org.apache.pinot.common.restlet.resources.TableSegments;
@@ -954,4 +955,28 @@ public class TablesResource {
}
return new TableSegmentValidationInfo(true, maxEndTimeMs);
}
+
+ @GET
+ @Path("/tables/{tableName}/segments/needReload")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Checks if reload is needed on any segment", notes =
"Returns true if reload is required on"
+ + " any segment in this server")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success", response =
TableSegments.class), @ApiResponse(code = 500,
+ message = "Internal Server error", response = ErrorInfo.class)
+ })
+ public String checkSegmentsReload(
+ @ApiParam(value = "Table Name with type", required = true)
@PathParam("tableName") String tableName,
+ @Context HttpHeaders headers) {
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ TableDataManager tableDataManager =
ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName);
+ boolean needReload = false;
+ try {
+ needReload = tableDataManager.needReloadSegments();
+ } catch (Exception e) {
+ throw new WebApplicationException(e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ return ResourceUtils.convertToJsonString(
+ new ServerSegmentsReloadCheckResponse(needReload,
tableDataManager.getInstanceId()));
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index f4133fee59..19bed50b68 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -239,6 +239,11 @@ public class ControllerRequestURLBuilder {
return StringUtil.join("/", _baseUrl, "segments", tableName, query);
}
+ public String forTableNeedReload(String tableNameWithType, boolean verbose) {
+ String query = String.format("needReload?verbose=%s", verbose);
+ return StringUtil.join("/", _baseUrl, "segments", tableNameWithType,
query);
+ }
+
public String forTableRebalanceStatus(String jobId) {
return StringUtil.join("/", _baseUrl, "rebalanceStatus", jobId);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]