kfaraz commented on code in PR #12404:
URL: https://github.com/apache/druid/pull/12404#discussion_r881312942


##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java:
##########
@@ -391,6 +571,109 @@ private String 
getWhereClauseForActiveStatusesQuery(String dataSource)
     return sql;
   }
 
+  class TaskMetadataInfoMapperFromPayload implements 
ResultSetMapper<TaskInfo<TaskMetadata, StatusType>>

Review Comment:
   I think this can be private.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java:
##########
@@ -391,6 +571,109 @@ private String 
getWhereClauseForActiveStatusesQuery(String dataSource)
     return sql;
   }
 
+  class TaskMetadataInfoMapperFromPayload implements 
ResultSetMapper<TaskInfo<TaskMetadata, StatusType>>
+  {
+    private final ObjectMapper objectMapper;
+
+    TaskMetadataInfoMapperFromPayload(ObjectMapper objectMapper)
+    {
+      this.objectMapper = objectMapper;
+    }
+
+    @Override
+    public TaskInfo<TaskMetadata, StatusType> map(int index, ResultSet 
resultSet, StatementContext context)
+        throws SQLException
+    {
+      return toTaskMetadataInfo(objectMapper, resultSet, true);
+    }
+  }
+
+  class TaskMetadataInfoMapper implements 
ResultSetMapper<TaskInfo<TaskMetadata, StatusType>>

Review Comment:
   This can be private.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java:
##########
@@ -391,6 +571,109 @@ private String 
getWhereClauseForActiveStatusesQuery(String dataSource)
     return sql;
   }
 
+  class TaskMetadataInfoMapperFromPayload implements 
ResultSetMapper<TaskInfo<TaskMetadata, StatusType>>
+  {
+    private final ObjectMapper objectMapper;
+
+    TaskMetadataInfoMapperFromPayload(ObjectMapper objectMapper)
+    {
+      this.objectMapper = objectMapper;
+    }
+
+    @Override
+    public TaskInfo<TaskMetadata, StatusType> map(int index, ResultSet 
resultSet, StatementContext context)
+        throws SQLException
+    {
+      return toTaskMetadataInfo(objectMapper, resultSet, true);
+    }
+  }
+
+  class TaskMetadataInfoMapper implements 
ResultSetMapper<TaskInfo<TaskMetadata, StatusType>>
+  {
+    private final ObjectMapper objectMapper;
+
+    TaskMetadataInfoMapper(ObjectMapper objectMapper)
+    {
+      this.objectMapper = objectMapper;
+    }
+
+    @Override
+    public TaskInfo<TaskMetadata, StatusType> map(int index, ResultSet 
resultSet, StatementContext context)
+        throws SQLException
+    {
+      return toTaskMetadataInfo(objectMapper, resultSet, false);
+    }
+  }
+
+  private TaskInfo<TaskMetadata, StatusType> toTaskMetadataInfo(ObjectMapper 
objectMapper,
+                                                                       
ResultSet resultSet,
+                                                                       boolean 
usePayload
+  ) throws SQLException
+  {
+    String type;
+    String groupId;
+    if (usePayload) {
+      try {
+        ObjectNode payload = 
objectMapper.readValue(resultSet.getBytes("payload"), ObjectNode.class);
+        type = payload.get("type").asText();
+        groupId = payload.get("groupId").asText();
+      }

Review Comment:
   As the generic nature of these contracts is kind of already being broken by 
these newly added methods, I suppose you could just return a `TaskStatusPlus` 
instead of disguising it as a `TaskInfo`.
   
   Read the `status_payload` as a `TaskStatus` and extract the items that you 
need.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java:
##########
@@ -391,6 +571,109 @@ private String 
getWhereClauseForActiveStatusesQuery(String dataSource)
     return sql;
   }
 
+  class TaskMetadataInfoMapperFromPayload implements 
ResultSetMapper<TaskInfo<TaskMetadata, StatusType>>
+  {
+    private final ObjectMapper objectMapper;
+
+    TaskMetadataInfoMapperFromPayload(ObjectMapper objectMapper)
+    {
+      this.objectMapper = objectMapper;
+    }
+
+    @Override
+    public TaskInfo<TaskMetadata, StatusType> map(int index, ResultSet 
resultSet, StatementContext context)
+        throws SQLException
+    {
+      return toTaskMetadataInfo(objectMapper, resultSet, true);
+    }
+  }
+
+  class TaskMetadataInfoMapper implements 
ResultSetMapper<TaskInfo<TaskMetadata, StatusType>>
+  {
+    private final ObjectMapper objectMapper;
+
+    TaskMetadataInfoMapper(ObjectMapper objectMapper)
+    {
+      this.objectMapper = objectMapper;
+    }
+
+    @Override
+    public TaskInfo<TaskMetadata, StatusType> map(int index, ResultSet 
resultSet, StatementContext context)
+        throws SQLException
+    {
+      return toTaskMetadataInfo(objectMapper, resultSet, false);
+    }
+  }
+
+  private TaskInfo<TaskMetadata, StatusType> toTaskMetadataInfo(ObjectMapper 
objectMapper,
+                                                                       
ResultSet resultSet,
+                                                                       boolean 
usePayload
+  ) throws SQLException
+  {
+    String type;
+    String groupId;
+    if (usePayload) {
+      try {
+        ObjectNode payload = 
objectMapper.readValue(resultSet.getBytes("payload"), ObjectNode.class);
+        type = payload.get("type").asText();
+        groupId = payload.get("groupId").asText();
+      }

Review Comment:
   It would be cleaner to just fix the class `TaskMetadata` to be jackson 
deserializable and just do something similar to what is happening for the 
`status_payload` later in this method rather than using `ObjectNode`.



##########
core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java:
##########
@@ -99,6 +103,54 @@ List<TaskInfo<EntryType, StatusType>> getTaskInfos(
       @Nullable String datasource
   );
 
+  /**
+   * This is the recommended method to fetch Tasks for the task view
+   * This utilizes the new type and group_id columns and should be utilized 
after migration
+   * Returns a list of TaskInfo for the tasks corresponding to the given 
filters
+   * The TaskInfo comprises the TaskMetadata which is significantly smaller 
than a Task, and the TaskStatus
+   * These are sufficient to create the TaskStatusPlus for a given Task, and 
prevent unnecessary memory usage
+   *
+   * If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns 
all active tasks in the metadata store.
+   * If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it 
returns all complete tasks in the metadata
+   * store. For complete tasks, additional filters in {@code 
CompleteTaskLookup} can be applied.
+   * All lookups should be processed atomically if more than one lookup is 
given.
+   *
+   * fetchPayload determines the query used to fetch from the tasks table
+   * If true, fetch the payload and deserialize it to obtain the above fields
+   * Else, use the newly created type and group_id columns in the query for 
task summaries

Review Comment:
   ```suggestion
      * Returns the statuses of the specified tasks. Implementations of this 
method must not
      * read the task payload from the underlying storage as it may increase 
memory usage.
      * 
      * If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it 
returns all active tasks in the metadata store.
      * If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it 
returns all complete tasks in the metadata
      * store. For complete tasks, additional filters in {@code 
CompleteTaskLookup} can be applied.
      * All lookups should be processed atomically if more than one lookup is 
given.
   ```



##########
core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java:
##########
@@ -99,6 +103,54 @@ List<TaskInfo<EntryType, StatusType>> getTaskInfos(
       @Nullable String datasource
   );
 
+  /**
+   * This is the recommended method to fetch Tasks for the task view
+   * This utilizes the new type and group_id columns and should be utilized 
after migration
+   * Returns a list of TaskInfo for the tasks corresponding to the given 
filters
+   * The TaskInfo comprises the TaskMetadata which is significantly smaller 
than a Task, and the TaskStatus
+   * These are sufficient to create the TaskStatusPlus for a given Task, and 
prevent unnecessary memory usage
+   *
+   * If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns 
all active tasks in the metadata store.
+   * If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it 
returns all complete tasks in the metadata
+   * store. For complete tasks, additional filters in {@code 
CompleteTaskLookup} can be applied.
+   * All lookups should be processed atomically if more than one lookup is 
given.
+   *
+   * fetchPayload determines the query used to fetch from the tasks table
+   * If true, fetch the payload and deserialize it to obtain the above fields
+   * Else, use the newly created type and group_id columns in the query for 
task summaries
+   *
+   * @param taskLookups task lookup type and filters.
+   * @param datasource  datasource filter
+   */
+  List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfos(

Review Comment:
   These new methods should be called `getTaskStatus` so that the intent is 
clear. `getTaskMetadataInfos` is a little vague.



##########
core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java:
##########
@@ -99,6 +103,54 @@ List<TaskInfo<EntryType, StatusType>> getTaskInfos(
       @Nullable String datasource
   );
 
+  /**
+   * This is the recommended method to fetch Tasks for the task view
+   * This utilizes the new type and group_id columns and should be utilized 
after migration
+   * Returns a list of TaskInfo for the tasks corresponding to the given 
filters
+   * The TaskInfo comprises the TaskMetadata which is significantly smaller 
than a Task, and the TaskStatus
+   * These are sufficient to create the TaskStatusPlus for a given Task, and 
prevent unnecessary memory usage
+   *
+   * If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns 
all active tasks in the metadata store.
+   * If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it 
returns all complete tasks in the metadata
+   * store. For complete tasks, additional filters in {@code 
CompleteTaskLookup} can be applied.
+   * All lookups should be processed atomically if more than one lookup is 
given.
+   *
+   * fetchPayload determines the query used to fetch from the tasks table
+   * If true, fetch the payload and deserialize it to obtain the above fields
+   * Else, use the newly created type and group_id columns in the query for 
task summaries
+   *
+   * @param taskLookups task lookup type and filters.
+   * @param datasource  datasource filter
+   */
+  List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfos(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String datasource
+  );
+
+  /**
+   * Please use this method to fetch task for viewing on ingestion tab only 
before task migration
+   * This deserializes the payload column to get the required fields, and has 
a greater overhead
+   * Returns a list of TaskInfo for the tasks corresponding to the given 
filters
+   * The TaskInfo comprises the TaskMetadata which is significantly smaller 
than a Task, and the TaskStatus
+   * These are sufficient to create the TaskStatusPlus for a given Task, and 
prevent unnecessary memory usage
+   *
+   * If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns 
all active tasks in the metadata store.
+   * If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it 
returns all complete tasks in the metadata
+   * store. For complete tasks, additional filters in {@code 
CompleteTaskLookup} can be applied.
+   * All lookups should be processed atomically if more than one lookup is 
given.
+   *
+   * fetchPayload determines the query used to fetch from the tasks table
+   * If true, fetch the payload and deserialize it to obtain the above fields
+   * Else, use the newly created type and group_id columns in the query for 
task summaries

Review Comment:
   ```suggestion
      * Returns the statuses of the specified tasks. Implementations of this 
method may
      * read from the corresponding task payloads to retrieve task information.
      *
      * This method is deprecated and {@link #getTaskStatus} should be used 
instead.
      *
      * If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it 
returns all active tasks in the metadata store.
      * If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it 
returns all complete tasks in the metadata
      * store. For complete tasks, additional filters in {@code 
CompleteTaskLookup} can be applied.
      * All lookups should be processed atomically if more than one lookup is 
given.
   ```



##########
core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java:
##########
@@ -99,6 +103,54 @@ List<TaskInfo<EntryType, StatusType>> getTaskInfos(
       @Nullable String datasource
   );
 
+  /**
+   * This is the recommended method to fetch Tasks for the task view
+   * This utilizes the new type and group_id columns and should be utilized 
after migration
+   * Returns a list of TaskInfo for the tasks corresponding to the given 
filters
+   * The TaskInfo comprises the TaskMetadata which is significantly smaller 
than a Task, and the TaskStatus
+   * These are sufficient to create the TaskStatusPlus for a given Task, and 
prevent unnecessary memory usage
+   *
+   * If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns 
all active tasks in the metadata store.
+   * If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it 
returns all complete tasks in the metadata
+   * store. For complete tasks, additional filters in {@code 
CompleteTaskLookup} can be applied.
+   * All lookups should be processed atomically if more than one lookup is 
given.
+   *
+   * fetchPayload determines the query used to fetch from the tasks table
+   * If true, fetch the payload and deserialize it to obtain the above fields
+   * Else, use the newly created type and group_id columns in the query for 
task summaries
+   *
+   * @param taskLookups task lookup type and filters.
+   * @param datasource  datasource filter
+   */
+  List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfos(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String datasource
+  );
+
+  /**
+   * Please use this method to fetch task for viewing on ingestion tab only 
before task migration
+   * This deserializes the payload column to get the required fields, and has 
a greater overhead
+   * Returns a list of TaskInfo for the tasks corresponding to the given 
filters
+   * The TaskInfo comprises the TaskMetadata which is significantly smaller 
than a Task, and the TaskStatus
+   * These are sufficient to create the TaskStatusPlus for a given Task, and 
prevent unnecessary memory usage
+   *
+   * If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns 
all active tasks in the metadata store.
+   * If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it 
returns all complete tasks in the metadata
+   * store. For complete tasks, additional filters in {@code 
CompleteTaskLookup} can be applied.
+   * All lookups should be processed atomically if more than one lookup is 
given.
+   *
+   * fetchPayload determines the query used to fetch from the tasks table
+   * If true, fetch the payload and deserialize it to obtain the above fields
+   * Else, use the newly created type and group_id columns in the query for 
task summaries
+   *
+   * @param taskLookups task lookup type and filters.
+   * @param datasource  datasource filter
+   */
+  List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfosFromPayload(

Review Comment:
   Mark this as deprecated.
   Rename to `getTaskStatusWithPayload`



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java:
##########
@@ -391,6 +571,109 @@ private String 
getWhereClauseForActiveStatusesQuery(String dataSource)
     return sql;
   }
 
+  class TaskMetadataInfoMapperFromPayload implements 
ResultSetMapper<TaskInfo<TaskMetadata, StatusType>>
+  {
+    private final ObjectMapper objectMapper;
+
+    TaskMetadataInfoMapperFromPayload(ObjectMapper objectMapper)
+    {
+      this.objectMapper = objectMapper;
+    }
+
+    @Override
+    public TaskInfo<TaskMetadata, StatusType> map(int index, ResultSet 
resultSet, StatementContext context)
+        throws SQLException
+    {
+      return toTaskMetadataInfo(objectMapper, resultSet, true);
+    }
+  }
+
+  class TaskMetadataInfoMapper implements 
ResultSetMapper<TaskInfo<TaskMetadata, StatusType>>
+  {
+    private final ObjectMapper objectMapper;
+
+    TaskMetadataInfoMapper(ObjectMapper objectMapper)
+    {
+      this.objectMapper = objectMapper;
+    }
+
+    @Override
+    public TaskInfo<TaskMetadata, StatusType> map(int index, ResultSet 
resultSet, StatementContext context)
+        throws SQLException
+    {
+      return toTaskMetadataInfo(objectMapper, resultSet, false);
+    }
+  }
+
+  private TaskInfo<TaskMetadata, StatusType> toTaskMetadataInfo(ObjectMapper 
objectMapper,
+                                                                       
ResultSet resultSet,
+                                                                       boolean 
usePayload
+  ) throws SQLException

Review Comment:
   Nit: formatting.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java:
##########
@@ -311,7 +325,126 @@ public List<TaskInfo<EntryType, StatusType>> getTaskInfos(
     );
   }
 
-  protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
+  @Override
+  public List<TaskInfo<TaskMetadata, StatusType>> 
getTaskMetadataInfosFromPayload(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource
+  )
+  {
+    return getTaskMetadataInfos(taskLookups, dataSource, true);
+  }
+
+  @Override
+  public List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfos(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource
+  )
+  {
+    return getTaskMetadataInfos(taskLookups, dataSource, false);
+  }
+
+  public List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfos(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource,
+      boolean fetchPayload
+  )
+  {
+    ResultSetMapper<TaskInfo<TaskMetadata, StatusType>> resultSetMapper =
+        fetchPayload ? taskMetadataInfoMapperFromPayload : 
taskMetadataInfoMapper;
+    return getConnector().retryTransaction(
+        (handle, status) -> {
+          final List<TaskInfo<TaskMetadata, StatusType>> taskMetadataInfos = 
new ArrayList<>();
+          for (Entry<TaskLookupType, TaskLookup> entry : 
taskLookups.entrySet()) {
+            final Query<Map<String, Object>> query;
+            switch (entry.getKey()) {
+              case ACTIVE:
+                query = !fetchPayload
+                        ? createActiveTaskSummaryStreamingQuery(handle, 
dataSource)
+                        : createActiveTaskStreamingQuery(handle, dataSource);
+                taskMetadataInfos.addAll(query.map(resultSetMapper).list());
+                break;
+              case COMPLETE:
+                CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) 
entry.getValue();
+                DateTime priorTo = completeTaskLookup.getTasksCreatedPriorTo();
+                Integer limit = completeTaskLookup.getMaxTaskStatuses();
+                query = !fetchPayload
+                        ? createCompletedTaskSummaryStreamingQuery(handle, 
priorTo, limit, dataSource)
+                        : createCompletedTaskStreamingQuery(handle, priorTo, 
limit, dataSource);
+                taskMetadataInfos.addAll(query.map(resultSetMapper).list());
+                break;
+              default:
+                throw new IAE("Unknown TaskLookupType: [%s]", entry.getKey());
+            }
+          }
+          for (TaskInfo<TaskMetadata, StatusType> taskMetadataInfo : 
taskMetadataInfos) {
+            System.out.println(taskMetadataInfo.getTask());
+          }
+          System.out.println("-------------------");
+          return taskMetadataInfos;
+        },
+        3,
+        SQLMetadataConnector.DEFAULT_MAX_TRIES
+    );
+  }
+
+  /**
+   * Fetches the columns needed to build TaskStatusPlus for completed tasks
+   * Please note that this requires completion of data migration to avoid 
empty values for task type and groupId
+   * Recommended for GET /tasks API
+   * Uses streaming SQL query to avoid fetching too many rows at once into 
memory
+   * @param handle db handle
+   * @param dataSource datasource to which the tasks belong. null if we don't 
want to filter
+   * @return Query object for TaskStatusPlus for completed tasks of interest
+   */
+  protected Query<Map<String, Object>> 
createCompletedTaskSummaryStreamingQuery(
+      Handle handle,
+      DateTime timestamp,
+      @Nullable Integer maxNumStatuses,
+      @Nullable String dataSource
+  )
+  {
+    String sql = StringUtils.format(
+        "SELECT "
+        + "  id, "
+        + "  created_date, "
+        + "  datasource, "
+        + "  group_id, "
+        + "  type, "
+        + "  status_payload "
+        + "FROM "
+        + "  %s "
+        + "WHERE "
+        + getWhereClauseForInactiveStatusesSinceQuery(dataSource)
+        + "ORDER BY created_date DESC",
+        getEntryTable()
+    );
+
+    if (maxNumStatuses != null) {
+      sql = decorateSqlWithLimit(sql);
+    }
+    Query<Map<String, Object>> query = handle.createQuery(sql)
+                                             .bind("start", 
timestamp.toString())
+                                             
.setFetchSize(connector.getStreamingFetchSize());
+
+    if (maxNumStatuses != null) {
+      query = query.bind("n", maxNumStatuses);
+    }
+    if (dataSource != null) {
+      query = query.bind("ds", dataSource);
+    }
+    return query;
+  }
+
+  /**
+   * Fetches the columns needed to build a Task object with payload for 
completed tasks
+   * This requires the task payload which can be large. Please use only when 
necessary.
+   * For example for ingestion tasks view before migration of the new columns
+   * Uses streaming SQL query to avoid fetching too many rows at once into 
memory
+   * @param handle db handle
+   * @param dataSource datasource to which the tasks belong. null if we don't 
want to filter
+   * @return Query object for completed TaskInfos of interest
+   */
+  protected Query<Map<String, Object>> createCompletedTaskStreamingQuery(

Review Comment:
   These query builder methods can probably be private.
   Also see if it is possible to combine any of these methods as it seems that 
the queries are mostly repeated.



##########
core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java:
##########
@@ -49,10 +52,11 @@ void insert(
       @NotNull String dataSource,
       @NotNull EntryType entry,
       boolean active,
-      @Nullable StatusType status
+      @Nullable StatusType status,
+      @NotNull String type,
+      @NotNull String groupId

Review Comment:
   Nit: These arguments should probably come earlier, maybe right after `id`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to