This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7247da84bc When reading server to segments map, exclude OFFLINE
segments (#11818)
7247da84bc is described below
commit 7247da84bcb51c49c8e45354d3e9c2ec05e3e8df
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Oct 16 19:19:18 2023 -0700
When reading server to segments map, exclude OFFLINE segments (#11818)
---
.../api/resources/PinotSegmentRestletResource.java | 8 ++--
.../helix/core/PinotHelixResourceManager.java | 52 +++++++---------------
.../util/ConsumingSegmentInfoReader.java | 8 ++--
.../ConsumingSegmentInfoReaderStatelessTest.java | 6 +--
4 files changed, 28 insertions(+), 46 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 33d4b65b5d..7a5961c680 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -34,12 +34,12 @@ import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -661,13 +661,13 @@ public class PinotSegmentRestletResource {
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
if (singleSegmentName != null) {
// No need to query servers where this segment is not supposed to be
hosted
- serverToSegments = new HashMap<>();
- List<String> segmentList = Arrays.asList(singleSegmentName);
+ serverToSegments = new TreeMap<>();
+ List<String> segmentList = Collections.singletonList(singleSegmentName);
_pinotHelixResourceManager.getServers(tableNameWithType,
singleSegmentName).forEach(server -> {
serverToSegments.put(server, segmentList);
});
} else {
- serverToSegments =
_pinotHelixResourceManager.getServerToOnlineSegmentsMap(tableNameWithType);
+ serverToSegments =
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
}
BiMap<String, String> serverEndPoints =
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index c3fde8a85d..bfb4918bf7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2651,29 +2651,8 @@ public class PinotHelixResourceManager {
}
/**
- * Returns a map from server instance to list of online segments it serves
for the given table.
- */
- public Map<String, List<String>> getServerToOnlineSegmentsMap(String
tableNameWithType) {
- Map<String, List<String>> serverToSegmentsMap = new TreeMap<>();
- IdealState idealState =
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
- if (idealState == null) {
- throw new IllegalStateException("Ideal state does not exist for table: "
+ tableNameWithType);
- }
- for (Map.Entry<String, Map<String, String>> entry :
idealState.getRecord().getMapFields().entrySet()) {
- String segmentName = entry.getKey();
- for (Map.Entry<String, String> e : entry.getValue().entrySet()) {
- String server = e.getKey();
- String status = e.getValue();
- if (status.equals(SegmentStateModel.CONSUMING) ||
status.equals(SegmentStateModel.ONLINE)) {
- serverToSegmentsMap.computeIfAbsent(server, key -> new
ArrayList<>()).add(segmentName);
- }
- }
- }
- return serverToSegmentsMap;
- }
-
- /**
- * Returns a map from server instance to list of segments it serves for the
given table.
+ * Returns a map from server instance to list of segments it serves for the
given table. Ignore OFFLINE segments from
+ * the ideal state because they are not supposed to be served.
*/
public Map<String, List<String>> getServerToSegmentsMap(String
tableNameWithType) {
Map<String, List<String>> serverToSegmentsMap = new TreeMap<>();
@@ -2683,15 +2662,18 @@ public class PinotHelixResourceManager {
}
for (Map.Entry<String, Map<String, String>> entry :
idealState.getRecord().getMapFields().entrySet()) {
String segmentName = entry.getKey();
- for (String server : entry.getValue().keySet()) {
- serverToSegmentsMap.computeIfAbsent(server, key -> new
ArrayList<>()).add(segmentName);
+ for (Map.Entry<String, String> instanceStateEntry :
entry.getValue().entrySet()) {
+ if (!instanceStateEntry.getValue().equals(SegmentStateModel.OFFLINE)) {
+ serverToSegmentsMap.computeIfAbsent(instanceStateEntry.getKey(), key
-> new ArrayList<>()).add(segmentName);
+ }
}
}
return serverToSegmentsMap;
}
/**
- * Returns a set of server instances for a given table and segment
+ * Returns a set of server instances for a given table and segment. Ignore
OFFLINE segments from the ideal state
+ * because they are not supposed to be served.
*/
public Set<String> getServers(String tableNameWithType, String segmentName) {
IdealState idealState =
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
@@ -2701,7 +2683,13 @@ public class PinotHelixResourceManager {
Map<String, String> instanceStateMap =
idealState.getInstanceStateMap(segmentName);
Preconditions.checkState(instanceStateMap != null, "Segment: {} does not
exist in the ideal state of table: {}",
segmentName, tableNameWithType);
- return instanceStateMap.keySet();
+ Set<String> servers = new TreeSet<>();
+ for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
+ if (!entry.getValue().equals(SegmentStateModel.OFFLINE)) {
+ servers.add(entry.getKey());
+ }
+ }
+ return servers;
}
/**
@@ -2722,15 +2710,9 @@ public class PinotHelixResourceManager {
return consumingSegments;
}
- /**
- * Utility function to return set of servers corresponding to a given
segment.
- */
+ @Deprecated
public Set<String> getServersForSegment(String tableNameWithType, String
segmentName) {
- IdealState idealState =
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
- if (idealState == null) {
- throw new IllegalStateException("Ideal state does not exist for table: "
+ tableNameWithType);
- }
- return new HashSet<>(idealState.getInstanceStateMap(segmentName).keySet());
+ return getServers(tableNameWithType, segmentName);
}
public synchronized Map<String, String> getSegmentsCrcForTable(String
tableNameWithType) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
index c8ee931580..26dab01955 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
@@ -157,13 +157,13 @@ public class ConsumingSegmentInfoReader {
}
// Check if any responses are missing
- Set<String> serversForSegment =
_pinotHelixResourceManager.getServersForSegment(tableNameWithType, segmentName);
- if (serversForSegment.size() != consumingSegmentInfoList.size()) {
+ Set<String> servers =
_pinotHelixResourceManager.getServers(tableNameWithType, segmentName);
+ if (servers.size() != consumingSegmentInfoList.size()) {
Set<String> serversResponded =
consumingSegmentInfoList.stream().map(c ->
c._serverName).collect(Collectors.toSet());
- serversForSegment.removeAll(serversResponded);
+ servers.removeAll(serversResponded);
String errorMessage =
- "Not all servers responded for segment: " + segmentName + "
Missing servers : " + serversForSegment;
+ "Not all servers responded for segment: " + segmentName + "
Missing servers : " + servers;
return
TableStatus.IngestionStatus.newIngestionStatus(TableStatus.IngestionState.UNHEALTHY,
errorMessage);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
index 3e8d935813..9c29142b46 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
@@ -28,10 +28,10 @@ import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@@ -197,11 +197,11 @@ public class ConsumingSegmentInfoReaderStatelessTest {
private void mockSetup(final String[] servers, final Set<String>
consumingSegments)
throws InvalidConfigException {
when(_helix.getServerToSegmentsMap(anyString())).thenAnswer(invocationOnMock ->
subsetOfServerSegments(servers));
+ when(_helix.getServers(anyString(), anyString())).thenAnswer(
+ invocationOnMock -> new TreeSet<>(Arrays.asList(servers)));
when(_helix.getDataInstanceAdminEndpoints(ArgumentMatchers.anySet())).thenAnswer(
invocationOnMock -> serverEndpoints(servers));
when(_helix.getConsumingSegments(anyString())).thenAnswer(invocationOnMock
-> consumingSegments);
- when(_helix.getServersForSegment(anyString(), anyString())).thenAnswer(
- invocationOnMock -> new HashSet<>(Arrays.asList(servers)));
}
private ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap testRunner(final
String[] servers,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]