Github user pgandhi999 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21688#discussion_r223124830
  
    --- Diff: 
core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala ---
    @@ -102,4 +103,159 @@ private[v1] class StagesResource extends 
BaseAppResource {
         withUI(_.store.taskList(stageId, stageAttemptId, offset, length, 
sortBy))
       }
     
    +  // This api needs to stay formatted exactly as it is below, since, it is 
being used by the
    +  // datatables for the stages page.
    +  @GET
    +  @Path("{stageId: \\d+}/{stageAttemptId: \\d+}/taskTable")
    +  def taskTable(
    +    @PathParam("stageId") stageId: Int,
    +    @PathParam("stageAttemptId") stageAttemptId: Int,
    +    @QueryParam("details") @DefaultValue("true") details: Boolean,
    +    @Context uriInfo: UriInfo):
    +  HashMap[String, Object] = {
    +    withUI { ui =>
    +      val uriQueryParameters = uriInfo.getQueryParameters(true)
    +      val totalRecords = uriQueryParameters.getFirst("numTasks")
    +      var isSearch = false
    +      var searchValue: String = null
    +      var filteredRecords = totalRecords
    +      var _tasksToShow: Seq[TaskData] = null
    +      if (uriQueryParameters.getFirst("search[value]") != null &&
    +        uriQueryParameters.getFirst("search[value]").length > 0) {
    +        _tasksToShow = doPagination(uriQueryParameters, stageId, 
stageAttemptId, true,
    +          totalRecords.toInt)
    +        isSearch = true
    +        searchValue = uriQueryParameters.getFirst("search[value]")
    +      } else {
    +        _tasksToShow = doPagination(uriQueryParameters, stageId, 
stageAttemptId, false,
    +          totalRecords.toInt)
    +      }
    +      val ret = new HashMap[String, Object]()
    +      if (_tasksToShow.nonEmpty) {
    +        // Performs server-side search based on input from user
    +        if (isSearch) {
    +          val filteredTaskList = filterTaskList(_tasksToShow, searchValue)
    +          filteredRecords = filteredTaskList.length.toString
    +          if (filteredTaskList.length > 0) {
    +            val pageStartIndex = uriQueryParameters.getFirst("start").toInt
    +            val pageLength = uriQueryParameters.getFirst("length").toInt
    +            ret.put("aaData", filteredTaskList.slice(pageStartIndex, 
pageStartIndex + pageLength))
    +          } else {
    +            ret.put("aaData", filteredTaskList)
    +          }
    +        } else {
    +          ret.put("aaData", _tasksToShow)
    +        }
    +      } else {
    +        ret.put("aaData", _tasksToShow)
    +      }
    +      ret.put("recordsTotal", totalRecords)
    +      ret.put("recordsFiltered", filteredRecords)
    +      ret
    +    }
    +  }
    +
    +  // Performs pagination on the server side
    +  def doPagination(queryParameters: MultivaluedMap[String, String], 
stageId: Int,
    +    stageAttemptId: Int, isSearch: Boolean, totalRecords: Int): 
Seq[TaskData] = {
    +    val queryParams = queryParameters.keySet()
    +    var columnToSort = 0
    +    if (queryParams.contains("order[0][column]")) {
    +      columnToSort = queryParameters.getFirst("order[0][column]").toInt
    +    }
    +    var columnNameToSort = queryParameters.getFirst("columns[" + 
columnToSort + "][name]")
    +    if (columnNameToSort.equalsIgnoreCase("Logs")) {
    +      columnNameToSort = "Index"
    +      columnToSort = 0
    +    }
    +    val isAscendingStr = queryParameters.getFirst("order[0][dir]")
    +    var pageStartIndex = 0
    +    var pageLength = totalRecords
    +    if (!isSearch) {
    +      pageStartIndex = queryParameters.getFirst("start").toInt
    +      pageLength = queryParameters.getFirst("length").toInt
    +    }
    +    return withUI(_.store.taskList(stageId, stageAttemptId, 
pageStartIndex, pageLength,
    +      indexName(columnNameToSort), isAscendingStr.equalsIgnoreCase("asc")))
    +  }
    +
    +  // Filters task list based on search parameter
    +  def filterTaskList(
    +    taskDataList: Seq[TaskData],
    +    searchValue: String): Seq[TaskData] = {
    +    val defaultOptionString: String = "d"
    +    // The task metrics dummy object below has been added to avoid 
throwing exception in cases
    +    // when task metrics for a particular task do not exist as of yet
    +    val dummyTaskMetrics: TaskMetrics = new TaskMetrics(0, 0, 0, 0, 0, 0, 
0, 0, 0, 0,
    +      new InputMetrics(0, 0), new OutputMetrics(0, 0),
    +      new ShuffleReadMetrics(0, 0, 0, 0, 0, 0, 0), new 
ShuffleWriteMetrics(0, 0, 0))
    +    val searchValueLowerCase = searchValue.toLowerCase(Locale.ROOT)
    +    val filteredTaskDataSequence: Seq[TaskData] = taskDataList.filter(f =>
    --- End diff --
    
    Done


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to