This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c9e08f367a Updates forceCommit APIs to handle Pauseless (#14828)
c9e08f367a is described below
commit c9e08f367ab1e89e7e54bdc11a70989fdc5f8913
Author: NOOB <[email protected]>
AuthorDate: Fri Jan 24 05:59:46 2025 +0530
Updates forceCommit APIs to handle Pauseless (#14828)
---
.../api/resources/PinotRealtimeTableResource.java | 28 ++++++++++++-------
.../realtime/PinotLLCRealtimeSegmentManager.java | 16 +++++++++++
.../PinotLLCRealtimeSegmentManagerTest.java | 31 ++++++++++++++++++++++
.../tests/LLCRealtimeClusterIntegrationTest.java | 25 +++++++++++++++--
.../LLCRealtimeKafka3ClusterIntegrationTest.java | 25 +++++++++++++++--
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
6 files changed, 113 insertions(+), 13 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index 2ab15427f7..f4a0e633a0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -29,7 +29,6 @@ import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -222,15 +221,26 @@ public class PinotRealtimeTableResource {
String tableNameWithType =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
Set<String> consumingSegmentCommitted = JsonUtils.stringToObject(
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST),
Set.class);
- Set<String> onlineSegmentsForTable =
-
_pinotHelixResourceManager.getOnlineSegmentsFromIdealState(tableNameWithType,
false);
- Set<String> segmentsYetToBeCommitted = new HashSet<>();
- consumingSegmentCommitted.forEach(segmentName -> {
- if (!onlineSegmentsForTable.contains(segmentName)) {
- segmentsYetToBeCommitted.add(segmentName);
- }
- });
+ Set<String> segmentsToCheck;
+ String segmentsPendingToBeComittedString =
+
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST);
+
+ if (segmentsPendingToBeComittedString != null) {
+ segmentsToCheck =
JsonUtils.stringToObject(segmentsPendingToBeComittedString, Set.class);
+ } else {
+ segmentsToCheck = consumingSegmentCommitted;
+ }
+
+ Set<String> segmentsYetToBeCommitted =
+
_pinotLLCRealtimeSegmentManager.getSegmentsYetToBeCommitted(tableNameWithType,
segmentsToCheck);
+
+ if (segmentsYetToBeCommitted.size() < segmentsToCheck.size()) {
+
controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST,
+ JsonUtils.objectToString(segmentsYetToBeCommitted));
+ _pinotHelixResourceManager.addControllerJobToZK(forceCommitJobId,
controllerJobZKMetadata,
+ ControllerJobType.FORCE_COMMIT, prev -> true);
+ }
Map<String, Object> result = new HashMap<>(controllerJobZKMetadata);
result.put("segmentsYetToBeCommitted", segmentsYetToBeCommitted);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 3ed88967c6..2b9cf8f954 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -2009,4 +2009,20 @@ public class PinotLLCRealtimeSegmentManager {
URI createSegmentPath(String rawTableName, String segmentName) {
return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName,
URIUtils.encode(segmentName));
}
+
+ public Set<String> getSegmentsYetToBeCommitted(String tableNameWithType,
Set<String> segmentsToCheck) {
+ Set<String> segmentsYetToBeCommitted = new HashSet<>();
+ for (String segmentName: segmentsToCheck) {
+ SegmentZKMetadata segmentZKMetadata =
+ _helixResourceManager.getSegmentZKMetadata(tableNameWithType,
segmentName);
+ if (segmentZKMetadata == null) {
+ // Segment is deleted. No need to track this segment among segments
yetToBeCommitted.
+ continue;
+ }
+ if (!(segmentZKMetadata.getStatus().equals(Status.DONE))) {
+ segmentsYetToBeCommitted.add(segmentName);
+ }
+ }
+ return segmentsYetToBeCommitted;
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index dbe640d364..d5969e611f 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.realtime;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
@@ -1247,6 +1248,36 @@ public class PinotLLCRealtimeSegmentManagerTest {
Assert.assertEquals(partitionIds.size(), 2);
}
+ @Test
+ public void getSegmentsYetToBeCommitted() {
+ PinotHelixResourceManager mockHelixResourceManager =
mock(PinotHelixResourceManager.class);
+ FakePinotLLCRealtimeSegmentManager realtimeSegmentManager =
+ new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
+
+ SegmentZKMetadata mockSegmentZKMetadataDone =
mock(SegmentZKMetadata.class);
+ when(mockSegmentZKMetadataDone.getStatus()).thenReturn(Status.DONE);
+
+ SegmentZKMetadata mockSegmentZKMetadataUploaded =
mock(SegmentZKMetadata.class);
+
when(mockSegmentZKMetadataUploaded.getStatus()).thenReturn(Status.UPLOADED);
+
+ SegmentZKMetadata mockSegmentZKMetadataInProgress =
mock(SegmentZKMetadata.class);
+
when(mockSegmentZKMetadataInProgress.getStatus()).thenReturn(Status.IN_PROGRESS);
+
+ SegmentZKMetadata mockSegmentZKMetadataInCommitting =
mock(SegmentZKMetadata.class);
+
when(mockSegmentZKMetadataInCommitting.getStatus()).thenReturn(Status.COMMITTING);
+
+ when(mockHelixResourceManager.getSegmentZKMetadata("test",
"s0")).thenReturn(mockSegmentZKMetadataDone);
+ when(mockHelixResourceManager.getSegmentZKMetadata("test",
"s3")).thenReturn(mockSegmentZKMetadataDone);
+ when(mockHelixResourceManager.getSegmentZKMetadata("test",
"s2")).thenReturn(mockSegmentZKMetadataUploaded);
+ when(mockHelixResourceManager.getSegmentZKMetadata("test",
"s4")).thenReturn(mockSegmentZKMetadataInProgress);
+ when(mockHelixResourceManager.getSegmentZKMetadata("test",
"s1")).thenReturn(null);
+ when(mockHelixResourceManager.getSegmentZKMetadata("test",
"s5")).thenReturn(mockSegmentZKMetadataInCommitting);
+
+ Set<String> segmentsToCheck = ImmutableSet.of("s0", "s1", "s2", "s3",
"s4", "s5");
+ Set<String> segmentsYetToBeCommitted =
realtimeSegmentManager.getSegmentsYetToBeCommitted("test", segmentsToCheck);
+ assert ImmutableSet.of("s2", "s4", "s5").equals(segmentsYetToBeCommitted);
+ }
+
//////////////////////////////////////////////////////////////////////////////////
// Fake classes
/////////////////////////////////////////////////////////////////////////////////
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 78b34fc563..1765550641 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -41,6 +41,7 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.HashUtil;
@@ -427,12 +428,18 @@ public class LLCRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegr
throws Exception {
Set<String> consumingSegments =
getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
String jobId = forceCommit(getTableName());
+ Map<String, String> jobMetadata =
+ _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobType.FORCE_COMMIT);
+ assert jobMetadata != null;
+ assert jobMetadata.get("segmentsForceCommitted") != null;
TestUtils.waitForCondition(aVoid -> {
try {
if (isForceCommitJobCompleted(jobId)) {
- assertTrue(_controllerStarter.getHelixResourceManager()
- .getOnlineSegmentsFromIdealState(getTableName() + "_REALTIME",
false).containsAll(consumingSegments));
+ for (String segmentName : consumingSegments) {
+ assertEquals(CommonConstants.Segment.Realtime.Status.DONE,
_controllerStarter.getHelixResourceManager()
+ .getSegmentZKMetadata(getTableName() + "_REALTIME",
segmentName).getStatus());
+ }
return true;
}
return false;
@@ -462,6 +469,20 @@ public class LLCRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegr
assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId);
assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT");
+
+ assert
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST)
!= null;
+ assert
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST)
!= null;
+
+ Set<String> allSegments = JsonUtils.stringToObject(
+
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST).asText(),
HashSet.class);
+ Set<String> segmentsPending = new HashSet<>();
+ for (JsonNode element :
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST))
{
+ segmentsPending.add(element.asText());
+ }
+
+ assert segmentsPending.size() <= allSegments.size();
+ assert jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) ==
segmentsPending.size();
+
return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0;
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java
index dce404d64d..e61cb07c69 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java
@@ -41,6 +41,7 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.HashUtil;
@@ -395,12 +396,18 @@ public class LLCRealtimeKafka3ClusterIntegrationTest
extends BaseRealtimeCluster
throws Exception {
Set<String> consumingSegments =
getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
String jobId = forceCommit(getTableName());
+ Map<String, String> jobMetadata =
+ _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobType.FORCE_COMMIT);
+ assert jobMetadata != null;
+ assert jobMetadata.get("segmentsForceCommitted") != null;
TestUtils.waitForCondition(aVoid -> {
try {
if (isForceCommitJobCompleted(jobId)) {
- assertTrue(_controllerStarter.getHelixResourceManager()
- .getOnlineSegmentsFromIdealState(getTableName() + "_REALTIME",
false).containsAll(consumingSegments));
+ for (String segmentName : consumingSegments) {
+ assertEquals(CommonConstants.Segment.Realtime.Status.DONE,
_controllerStarter.getHelixResourceManager()
+ .getSegmentZKMetadata(getTableName() + "_REALTIME",
segmentName).getStatus());
+ }
return true;
}
return false;
@@ -430,6 +437,20 @@ public class LLCRealtimeKafka3ClusterIntegrationTest
extends BaseRealtimeCluster
assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId);
assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT");
+
+ assert
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST)
!= null;
+ assert
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST)
!= null;
+
+ Set<String> allSegments = JsonUtils.stringToObject(
+
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST).asText(),
HashSet.class);
+ Set<String> segmentsPending = new HashSet<>();
+ for (JsonNode element :
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST))
{
+ segmentsPending.add(element.asText());
+ }
+
+ assert segmentsPending.size() <= allSegments.size();
+ assert jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) ==
segmentsPending.size();
+
return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 30f4b44e27..ecb26b5dfd 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1039,6 +1039,7 @@ public class CommonConstants {
public static final String SEGMENT_RELOAD_JOB_INSTANCE_NAME =
"instanceName";
// Force commit job ZK props
public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST =
"segmentsForceCommitted";
+ public static final String CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST =
"segmentsYetToBeCommitted";
}
// prefix for scheduler related features, e.g. query accountant
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]