Github user michal-databricks commented on a diff in the pull request: https://github.com/apache/spark/pull/21688#discussion_r222977948 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala --- @@ -102,4 +103,159 @@ private[v1] class StagesResource extends BaseAppResource { withUI(_.store.taskList(stageId, stageAttemptId, offset, length, sortBy)) } + // This api needs to stay formatted exactly as it is below, since, it is being used by the + // datatables for the stages page. + @GET + @Path("{stageId: \\d+}/{stageAttemptId: \\d+}/taskTable") + def taskTable( + @PathParam("stageId") stageId: Int, + @PathParam("stageAttemptId") stageAttemptId: Int, + @QueryParam("details") @DefaultValue("true") details: Boolean, + @Context uriInfo: UriInfo): + HashMap[String, Object] = { + withUI { ui => + val uriQueryParameters = uriInfo.getQueryParameters(true) + val totalRecords = uriQueryParameters.getFirst("numTasks") + var isSearch = false + var searchValue: String = null + var filteredRecords = totalRecords + var _tasksToShow: Seq[TaskData] = null + if (uriQueryParameters.getFirst("search[value]") != null && + uriQueryParameters.getFirst("search[value]").length > 0) { + _tasksToShow = doPagination(uriQueryParameters, stageId, stageAttemptId, true, + totalRecords.toInt) + isSearch = true + searchValue = uriQueryParameters.getFirst("search[value]") + } else { + _tasksToShow = doPagination(uriQueryParameters, stageId, stageAttemptId, false, + totalRecords.toInt) + } + val ret = new HashMap[String, Object]() + if (_tasksToShow.nonEmpty) { + // Performs server-side search based on input from user + if (isSearch) { + val filteredTaskList = filterTaskList(_tasksToShow, searchValue) + filteredRecords = filteredTaskList.length.toString + if (filteredTaskList.length > 0) { + val pageStartIndex = uriQueryParameters.getFirst("start").toInt + val pageLength = uriQueryParameters.getFirst("length").toInt + ret.put("aaData", filteredTaskList.slice(pageStartIndex, pageStartIndex + pageLength)) + } else { + ret.put("aaData", filteredTaskList) + } + } else { + ret.put("aaData", _tasksToShow) + } + } else { + ret.put("aaData", _tasksToShow) + } + ret.put("recordsTotal", totalRecords) + ret.put("recordsFiltered", filteredRecords) + ret + } + } + + // Performs pagination on the server side + def doPagination(queryParameters: MultivaluedMap[String, String], stageId: Int, + stageAttemptId: Int, isSearch: Boolean, totalRecords: Int): Seq[TaskData] = { + val queryParams = queryParameters.keySet() + var columnToSort = 0 + if (queryParams.contains("order[0][column]")) { + columnToSort = queryParameters.getFirst("order[0][column]").toInt + } + var columnNameToSort = queryParameters.getFirst("columns[" + columnToSort + "][name]") + if (columnNameToSort.equalsIgnoreCase("Logs")) { + columnNameToSort = "Index" + columnToSort = 0 + } + val isAscendingStr = queryParameters.getFirst("order[0][dir]") + var pageStartIndex = 0 + var pageLength = totalRecords + if (!isSearch) { + pageStartIndex = queryParameters.getFirst("start").toInt + pageLength = queryParameters.getFirst("length").toInt + } + return withUI(_.store.taskList(stageId, stageAttemptId, pageStartIndex, pageLength, + indexName(columnNameToSort), isAscendingStr.equalsIgnoreCase("asc"))) + } + + // Filters task list based on search parameter + def filterTaskList( + taskDataList: Seq[TaskData], + searchValue: String): Seq[TaskData] = { + val defaultOptionString: String = "d" + // The task metrics dummy object below has been added to avoid throwing exception in cases + // when task metrics for a particular task do not exist as of yet + val dummyTaskMetrics: TaskMetrics = new TaskMetrics(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + new InputMetrics(0, 0), new OutputMetrics(0, 0), + new ShuffleReadMetrics(0, 0, 0, 0, 0, 0, 0), new ShuffleWriteMetrics(0, 0, 0)) + val searchValueLowerCase = searchValue.toLowerCase(Locale.ROOT) + val filteredTaskDataSequence: Seq[TaskData] = taskDataList.filter(f => --- End diff -- This function should have `val containsValue = (haystack: Any) => haystack.toString.toLowerCase(Locale.ROOT).contains(searchValueLowerCase)` to repetition in all this code below.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org