This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 142fb45c82 Allow periodic tasks to run with properties via the POST
API (#14915)
142fb45c82 is described below
commit 142fb45c8295d1192d34b7caca307593c46557e9
Author: 9aman <[email protected]>
AuthorDate: Wed Apr 2 19:43:50 2025 +0530
Allow periodic tasks to run with properties via the POST API (#14915)
---
...PinotControllerPeriodicTaskRestletResource.java | 39 ++++++++++++++++++++++
.../RealtimeSegmentValidationManager.java | 23 +++++++++++--
2 files changed, 60 insertions(+), 2 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
index 906356fba9..ac92b5ba56 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
@@ -26,8 +26,10 @@ import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.List;
+import java.util.Map;
import javax.inject.Inject;
import javax.ws.rs.GET;
+import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
@@ -103,6 +105,43 @@ public class PinotControllerPeriodicTaskRestletResource {
.build();
}
+ @POST
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/run")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.EXECUTE_TASK)
+ @ApiOperation(value = "Run periodic task against table with custom
properties. If table name is missing, task will "
+ + "run against all tables.")
+ public Response runPeriodicTaskWithProperties(
+ @ApiParam(value = "Periodic task name", required = true)
@QueryParam("taskname") String periodicTaskName,
+ @ApiParam(value = "Name of the table") @QueryParam("tableName") String
tableName,
+ @ApiParam(value = "OFFLINE | REALTIME") @QueryParam("type") String
tableType,
+ @ApiParam(value = "Task properties") Map<String, String> taskProperties,
+ @Context HttpHeaders headers) {
+
+ if (!_periodicTaskScheduler.hasTask(periodicTaskName)) {
+ throw new WebApplicationException("Periodic task '" + periodicTaskName +
"' not found.",
+ Response.Status.NOT_FOUND);
+ }
+
+ if (tableName != null) {
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ List<String> matchingTableNamesWithType =
+
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName,
+ Constants.validateTableType(tableType), LOGGER);
+
+ if (matchingTableNamesWithType.size() > 1) {
+ throw new WebApplicationException(
+ "More than one table matches Table '" + tableName + "'. Matching
names: " + matchingTableNamesWithType);
+ }
+
+ tableName = matchingTableNamesWithType.get(0);
+ }
+
+ return Response.ok()
+
.entity(_pinotHelixResourceManager.invokeControllerPeriodicTask(tableName,
periodicTaskName, taskProperties))
+ .build();
+ }
+
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/names")
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 2fcb669335..9fa236ecde 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -65,6 +66,7 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
private final boolean _segmentAutoResetOnErrorAtValidation;
public static final String OFFSET_CRITERIA = "offsetCriteria";
+ public static final String RUN_SEGMENT_LEVEL_VALIDATION =
"runSegmentLevelValidation";
public RealtimeSegmentValidationManager(ControllerConf config,
PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager,
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
@@ -89,8 +91,7 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
Context context = new Context();
// Run segment level validation only if certain time has passed after
previous run
long currentTimeMs = System.currentTimeMillis();
- if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs -
_lastSegmentLevelValidationRunTimeMs)
- >= _segmentLevelValidationIntervalInSeconds) {
+ if (shouldRunSegmentValidation(periodicTaskProperties, currentTimeMs)) {
LOGGER.info("Run segment-level validation");
context._runSegmentLevelValidation = true;
_lastSegmentLevelValidationRunTimeMs = currentTimeMs;
@@ -242,6 +243,24 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
}
}
+ private boolean shouldRunSegmentValidation(Properties
periodicTaskProperties, long currentTimeMs) {
+ boolean runValidation = Optional.ofNullable(
+ periodicTaskProperties.getProperty(RUN_SEGMENT_LEVEL_VALIDATION))
+ .map(value -> {
+ try {
+ return Boolean.parseBoolean(value);
+ } catch (Exception e) {
+ return false;
+ }
+ })
+ .orElse(false);
+
+ boolean timeThresholdMet = TimeUnit.MILLISECONDS.toSeconds(currentTimeMs -
_lastSegmentLevelValidationRunTimeMs)
+ >= _segmentLevelValidationIntervalInSeconds;
+
+ return runValidation || timeThresholdMet;
+ }
+
@Override
protected void nonLeaderCleanup(List<String> tableNamesWithType) {
for (String tableNameWithType : tableNamesWithType) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]