AmatyaAvadhanula commented on code in PR #12404:
URL: https://github.com/apache/druid/pull/12404#discussion_r897981516


##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java:
##########
@@ -311,7 +325,126 @@ public List<TaskInfo<EntryType, StatusType>> getTaskInfos(
     );
   }
 
-  protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
+  @Override
+  public List<TaskInfo<TaskMetadata, StatusType>> 
getTaskMetadataInfosFromPayload(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource
+  )
+  {
+    return getTaskMetadataInfos(taskLookups, dataSource, true);
+  }
+
+  @Override
+  public List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfos(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource
+  )
+  {
+    return getTaskMetadataInfos(taskLookups, dataSource, false);
+  }
+
+  public List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfos(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource,
+      boolean fetchPayload
+  )
+  {
+    ResultSetMapper<TaskInfo<TaskMetadata, StatusType>> resultSetMapper =
+        fetchPayload ? taskMetadataInfoMapperFromPayload : 
taskMetadataInfoMapper;
+    return getConnector().retryTransaction(
+        (handle, status) -> {
+          final List<TaskInfo<TaskMetadata, StatusType>> taskMetadataInfos = 
new ArrayList<>();
+          for (Entry<TaskLookupType, TaskLookup> entry : 
taskLookups.entrySet()) {
+            final Query<Map<String, Object>> query;
+            switch (entry.getKey()) {
+              case ACTIVE:
+                query = !fetchPayload
+                        ? createActiveTaskSummaryStreamingQuery(handle, 
dataSource)
+                        : createActiveTaskStreamingQuery(handle, dataSource);
+                taskMetadataInfos.addAll(query.map(resultSetMapper).list());
+                break;
+              case COMPLETE:
+                CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) 
entry.getValue();
+                DateTime priorTo = completeTaskLookup.getTasksCreatedPriorTo();
+                Integer limit = completeTaskLookup.getMaxTaskStatuses();
+                query = !fetchPayload
+                        ? createCompletedTaskSummaryStreamingQuery(handle, 
priorTo, limit, dataSource)
+                        : createCompletedTaskStreamingQuery(handle, priorTo, 
limit, dataSource);
+                taskMetadataInfos.addAll(query.map(resultSetMapper).list());
+                break;
+              default:
+                throw new IAE("Unknown TaskLookupType: [%s]", entry.getKey());
+            }
+          }
+          for (TaskInfo<TaskMetadata, StatusType> taskMetadataInfo : 
taskMetadataInfos) {
+            System.out.println(taskMetadataInfo.getTask());
+          }
+          System.out.println("-------------------");
+          return taskMetadataInfos;
+        },
+        3,
+        SQLMetadataConnector.DEFAULT_MAX_TRIES
+    );
+  }
+
+  /**
+   * Fetches the columns needed to build TaskStatusPlus for completed tasks
+   * Please note that this requires completion of data migration to avoid 
empty values for task type and groupId
+   * Recommended for GET /tasks API
+   * Uses streaming SQL query to avoid fetching too many rows at once into 
memory
+   * @param handle db handle
+   * @param dataSource datasource to which the tasks belong. null if we don't 
want to filter
+   * @return Query object for TaskStatusPlus for completed tasks of interest
+   */
+  protected Query<Map<String, Object>> 
createCompletedTaskSummaryStreamingQuery(
+      Handle handle,
+      DateTime timestamp,
+      @Nullable Integer maxNumStatuses,
+      @Nullable String dataSource
+  )
+  {
+    String sql = StringUtils.format(
+        "SELECT "
+        + "  id, "
+        + "  created_date, "
+        + "  datasource, "
+        + "  group_id, "
+        + "  type, "
+        + "  status_payload "
+        + "FROM "
+        + "  %s "
+        + "WHERE "
+        + getWhereClauseForInactiveStatusesSinceQuery(dataSource)
+        + "ORDER BY created_date DESC",
+        getEntryTable()
+    );
+
+    if (maxNumStatuses != null) {
+      sql = decorateSqlWithLimit(sql);
+    }
+    Query<Map<String, Object>> query = handle.createQuery(sql)
+                                             .bind("start", 
timestamp.toString())
+                                             
.setFetchSize(connector.getStreamingFetchSize());
+
+    if (maxNumStatuses != null) {
+      query = query.bind("n", maxNumStatuses);
+    }
+    if (dataSource != null) {
+      query = query.bind("ds", dataSource);
+    }
+    return query;
+  }
+
+  /**
+   * Fetches the columns needed to build a Task object with payload for 
completed tasks
+   * This requires the task payload which can be large. Please use only when 
necessary.
+   * For example for ingestion tasks view before migration of the new columns
+   * Uses streaming SQL query to avoid fetching too many rows at once into 
memory
+   * @param handle db handle
+   * @param dataSource datasource to which the tasks belong. null if we don't 
want to filter
+   * @return Query object for completed TaskInfos of interest
+   */
+  protected Query<Map<String, Object>> createCompletedTaskStreamingQuery(

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to