amrishlal commented on a change in pull request #7174:
URL: https://github.com/apache/pinot/pull/7174#discussion_r691549686
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
##########
@@ -108,4 +111,62 @@ public synchronized void stop() {
_tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
}
}
+
+ /** @return true if task with given name exists; otherwise, false. */
+ public boolean hasTask(String periodicTaskName) {
+ for (PeriodicTask task : _tasksWithValidInterval) {
+ if (task.getTaskName().equals(periodicTaskName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** @return List of tasks name that will run periodically. */
+ public List<String> getTaskNameList() {
+ List<String> taskNameList = new ArrayList<>();
+ for (PeriodicTask task : _tasksWithValidInterval) {
+ taskNameList.add(task.getTaskName());
+ }
+ return taskNameList;
+ }
+
+ private PeriodicTask getPeriodicTask(String periodicTaskName) {
+ for (PeriodicTask task : _tasksWithValidInterval) {
+ if (task.getTaskName().equals(periodicTaskName)) {
+ return task;
+ }
+ }
+ return null;
+ }
+
+ /** Execute {@link PeriodicTask} immediately on the specified table. */
+ public void scheduleNow(String periodicTaskName, @Nullable Properties
periodicTaskProperties) {
+ // During controller deployment, each controller can have a slightly
different list of periodic tasks if we add,
+ // remove, or rename periodic task. To avoid this situation, we check
again (besides the check at controller API
+ // level) whether the periodic task exists.
+ PeriodicTask periodicTask = getPeriodicTask(periodicTaskName);
+ if (periodicTask == null) {
+ throw new IllegalArgumentException("Unknown Periodic Task " +
periodicTaskName);
Review comment:
Fixed. Logging an error and returning.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.util.List;
+import java.util.UUID;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.PERIODIC_TASK_TAG)
+@Path("/periodictask")
+public class PinotControllerPeriodicTaskRestletResource {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PinotControllerPeriodicTaskRestletResource.class);
+ private static final String API_REQUEST_ID_PREFIX = "api-";
+
+ @Inject
+ PinotHelixResourceManager _pinotHelixResourceManager;
+
+ @Inject
+ PeriodicTaskScheduler _periodicTaskScheduler;
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/run")
+ @ApiOperation(value = "Run a periodic task against specified table. If no
table name is specified, task will run against all tables.")
+ public String runPeriodicTask(
+ @ApiParam(value = "Periodic task name", required = true)
@QueryParam("taskname") String periodicTaskName,
+ @ApiParam(value = "Table name", required = false)
@QueryParam("tablename") String tableName,
+ @ApiParam(value = "Table type suffix", required = false, example =
"OFFLINE | REALTIME", defaultValue = "OFFLINE") @QueryParam("tabletype") String
tableType) {
+ if (!_periodicTaskScheduler.hasTask(periodicTaskName)) {
+ throw new WebApplicationException("Periodic task '" + periodicTaskName +
"' not found.",
+ Response.Status.NOT_FOUND);
+ }
+
+ if (tableName != null) {
Review comment:
Fixed.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.util.List;
+import java.util.UUID;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.PERIODIC_TASK_TAG)
+@Path("/periodictask")
+public class PinotControllerPeriodicTaskRestletResource {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PinotControllerPeriodicTaskRestletResource.class);
+ private static final String API_REQUEST_ID_PREFIX = "api-";
+
+ @Inject
+ PinotHelixResourceManager _pinotHelixResourceManager;
+
+ @Inject
+ PeriodicTaskScheduler _periodicTaskScheduler;
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/run")
+ @ApiOperation(value = "Run a periodic task against specified table. If no
table name is specified, task will run against all tables.")
+ public String runPeriodicTask(
+ @ApiParam(value = "Periodic task name", required = true)
@QueryParam("taskname") String periodicTaskName,
+ @ApiParam(value = "Table name", required = false)
@QueryParam("tablename") String tableName,
+ @ApiParam(value = "Table type suffix", required = false, example =
"OFFLINE | REALTIME", defaultValue = "OFFLINE") @QueryParam("tabletype") String
tableType) {
+ if (!_periodicTaskScheduler.hasTask(periodicTaskName)) {
+ throw new WebApplicationException("Periodic task '" + periodicTaskName +
"' not found.",
+ Response.Status.NOT_FOUND);
+ }
+
+ if (tableName != null) {
+ tableName = tableName.trim();
+ if (tableName.length() > 0 &&
!_pinotHelixResourceManager.getAllRawTables().contains(tableName)) {
+ throw new WebApplicationException("Table '" + tableName + "' not
found.", Response.Status.NOT_FOUND);
+ }
+ }
+
+ // Generate an id for this request by taking first eight characters of a
randomly generated UUID. This request id
+ // is returned to the user and also appended to log messages so that user
can locate all log messages associated
+ // with this PeriodicTask's execution.
+ String periodicTaskRequestId = API_REQUEST_ID_PREFIX +
UUID.randomUUID().toString().substring(0,8);
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]