kfaraz commented on code in PR #12404:
URL: https://github.com/apache/druid/pull/12404#discussion_r878686022


##########
core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java:
##########
@@ -252,4 +254,29 @@ public String toString()
            ", errorMsg='" + errorMsg + '\'' +
            '}';
   }
+
+  /**
+   * Convert a TaskInfo pair of TaskMetadata and TaskStatus to a TaskStatusPlus

Review Comment:
   Nit: TaskInfo is not exactly a `Pair` even though there are similarities.



##########
core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java:
##########
@@ -173,4 +225,13 @@ default List<TaskInfo<EntryType, StatusType>> getTaskInfos(
    */
   @Nullable
   Long getLockId(String entryId, LockType lock);
+
+  /**
+   * Utility to migrate existing tasks to the new schema
+   *
+   * To be kicked off in a separate thread at MetadataTaskStorage startup.
+   * @param tasksTable
+   * @return
+   */
+  boolean migrateTaskTable(String tasksTable);

Review Comment:
   Method name is too vague. Please rename to what it exactly does, e.g. 
`populateTaskTypeAndGroupId`
   
   Why should `tasksTable` be passed here? You can use the `entryTable` field 
inside `SQLMetadataStorageActionHandler` if needed.



##########
core/src/main/java/org/apache/druid/indexer/TaskMetadata.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Model class containing the fields relevant to view tasks in the ingestion 
tab.
+ * These fields are extracted from the task payload for the new schema and 
this model can be used for migration as well.
+ */
+public class TaskMetadata

Review Comment:
   `TaskMetadata` is a little misleading. A better name for this class would be 
`TaskIdentifier` or simply `TaskId`.



##########
core/src/main/java/org/apache/druid/indexer/TaskMetadata.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Model class containing the fields relevant to view tasks in the ingestion 
tab.
+ * These fields are extracted from the task payload for the new schema and 
this model can be used for migration as well.

Review Comment:
   This information is not really needed. 
   
   In most cases, javadoc of a model class does not need to explain how the 
fields are populated and where they would be read unless it is something which 
must be kept in mind to use such a class correctly. Here, the fields are pretty 
basic and self explanatory.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +377,43 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              final Batch batch = handle.createBatch();
+              if (!tableContainsColumn(handle, tableName, "type")) {
+                log.info("Adding column: type to table[%s]", tableName);
+                batch.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN type 
VARCHAR(255)", tableName));
+              }
+              if (!tableContainsColumn(handle, tableName, "group_id")) {
+                log.info("Adding column: group_id to table[%s]", tableName);
+                batch.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN 
group_id VARCHAR(255)", tableName));
+              }
+              batch.execute();

Review Comment:
   Shouldn't we just skip the `batch.execute()` if no ALTER is required?



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java:
##########
@@ -311,7 +325,126 @@ public List<TaskInfo<EntryType, StatusType>> getTaskInfos(
     );
   }
 
-  protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
+  @Override
+  public List<TaskInfo<TaskMetadata, StatusType>> 
getTaskMetadataInfosFromPayload(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource
+  )
+  {
+    return getTaskMetadataInfos(taskLookups, dataSource, true);
+  }
+
+  @Override
+  public List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfos(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource
+  )
+  {
+    return getTaskMetadataInfos(taskLookups, dataSource, false);
+  }
+
+  public List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfos(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource,
+      boolean fetchPayload
+  )
+  {
+    ResultSetMapper<TaskInfo<TaskMetadata, StatusType>> resultSetMapper =
+        fetchPayload ? taskMetadataInfoMapperFromPayload : 
taskMetadataInfoMapper;
+    return getConnector().retryTransaction(
+        (handle, status) -> {
+          final List<TaskInfo<TaskMetadata, StatusType>> taskMetadataInfos = 
new ArrayList<>();
+          for (Entry<TaskLookupType, TaskLookup> entry : 
taskLookups.entrySet()) {
+            final Query<Map<String, Object>> query;
+            switch (entry.getKey()) {
+              case ACTIVE:
+                query = !fetchPayload
+                        ? createActiveTaskSummaryStreamingQuery(handle, 
dataSource)
+                        : createActiveTaskStreamingQuery(handle, dataSource);
+                taskMetadataInfos.addAll(query.map(resultSetMapper).list());
+                break;
+              case COMPLETE:
+                CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) 
entry.getValue();
+                DateTime priorTo = completeTaskLookup.getTasksCreatedPriorTo();
+                Integer limit = completeTaskLookup.getMaxTaskStatuses();
+                query = !fetchPayload
+                        ? createCompletedTaskSummaryStreamingQuery(handle, 
priorTo, limit, dataSource)
+                        : createCompletedTaskStreamingQuery(handle, priorTo, 
limit, dataSource);
+                taskMetadataInfos.addAll(query.map(resultSetMapper).list());
+                break;
+              default:
+                throw new IAE("Unknown TaskLookupType: [%s]", entry.getKey());
+            }
+          }
+          for (TaskInfo<TaskMetadata, StatusType> taskMetadataInfo : 
taskMetadataInfos) {
+            System.out.println(taskMetadataInfo.getTask());
+          }
+          System.out.println("-------------------");

Review Comment:
   Please remove all the sys outs.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java:
##########
@@ -311,7 +325,126 @@ public List<TaskInfo<EntryType, StatusType>> getTaskInfos(
     );
   }
 
-  protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
+  @Override
+  public List<TaskInfo<TaskMetadata, StatusType>> 
getTaskMetadataInfosFromPayload(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource
+  )
+  {
+    return getTaskMetadataInfos(taskLookups, dataSource, true);
+  }
+
+  @Override
+  public List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfos(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource
+  )
+  {
+    return getTaskMetadataInfos(taskLookups, dataSource, false);
+  }
+
+  public List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfos(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource,
+      boolean fetchPayload
+  )
+  {
+    ResultSetMapper<TaskInfo<TaskMetadata, StatusType>> resultSetMapper =
+        fetchPayload ? taskMetadataInfoMapperFromPayload : 
taskMetadataInfoMapper;
+    return getConnector().retryTransaction(
+        (handle, status) -> {
+          final List<TaskInfo<TaskMetadata, StatusType>> taskMetadataInfos = 
new ArrayList<>();
+          for (Entry<TaskLookupType, TaskLookup> entry : 
taskLookups.entrySet()) {
+            final Query<Map<String, Object>> query;
+            switch (entry.getKey()) {
+              case ACTIVE:
+                query = !fetchPayload

Review Comment:
   Nit: use condition on `fetchPayload` rather than `!fetchPayload`



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java:
##########
@@ -679,4 +962,81 @@ public Long getLockId(String entryId, LockType lock)
                             .findAny()
                             .orElse(null);
   }
+
+  private List<TaskMetadata> fetchTaskMetadatas(String tableName, String id, 
int limit)
+  {
+    List<TaskMetadata> taskMetadatas = new ArrayList<>();
+    connector.retryWithHandle(
+        new HandleCallback<Void>()
+        {
+          @Override
+          public Void withHandle(Handle handle)
+          {
+            String sql = StringUtils.format(
+                "SELECT * FROM %1$s WHERE id > '%2$s' AND type IS null ORDER 
BY id %3$s",
+                tableName,
+                id,
+                connector.limitClause(limit)
+            );
+            Query<Map<String, Object>> query = handle.createQuery(sql);
+            taskMetadatas.addAll(query.map(taskMetadataMapper).list());
+            return null;
+          }
+        }
+    );
+    return taskMetadatas;
+  }
+
+  private void updateTaskMetadatas(String tasksTable, List<TaskMetadata> 
taskMetadatas)
+  {
+    connector.retryWithHandle(
+        new HandleCallback<Void>()
+        {
+          @Override
+          public Void withHandle(Handle handle)
+          {
+            Batch batch = handle.createBatch();
+            String sql = "UPDATE %1$s SET type = '%2$s', group_id = '%3$s' 
WHERE id = '%4$s'";
+            for (TaskMetadata metadata : taskMetadatas) {
+              batch.add(StringUtils.format(sql, tasksTable, 
metadata.getType(), metadata.getGroupId(), metadata.getId())
+              );
+            }
+            batch.execute();
+            return null;
+          }
+        }
+    );
+  }
+
+
+  @Override
+  public boolean migrateTaskTable(String tableName)
+  {
+    log.info("Populate fields task and group_id of task entry table [%s] from 
payload", tableName);
+    String id = "";
+    int limit = 100;
+    while (true) {
+      List<TaskMetadata> taskMetadatas;
+      try {
+        taskMetadatas = fetchTaskMetadatas(tableName, id, limit);
+      }
+      catch (Exception e) {
+        log.warn(e, "Task migration failed while reading entries from task 
table");
+        return false;
+      }
+      if (taskMetadatas.isEmpty()) {
+        break;
+      }
+      try {
+        updateTaskMetadatas(tableName, taskMetadatas);
+      }
+      catch (Exception e) {
+        log.warn(e, "Task migration failed while updating entries in task 
table");
+        return false;
+      }
+      id = taskMetadatas.get(taskMetadatas.size() - 1).getId();

Review Comment:
   In every iteration of the loop, it might be useful to add a debug log which 
prints the id upto which processing has been done so far.



##########
core/src/main/java/org/apache/druid/indexer/TaskMetadata.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Model class containing the fields relevant to view tasks in the ingestion 
tab.

Review Comment:
   This comment seems too specific to web-console. This javadoc should just 
explain the contents of this model class.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java:
##########
@@ -241,4 +243,20 @@ default <ContextValueType> ContextValueType 
getContextValue(String key, ContextV
     final ContextValueType value = getContextValue(key);
     return value == null ? defaultValue : value;
   }
+
+  default TaskMetadata getMetadata()
+  {
+    return new TaskMetadata(this.getId(), this.getGroupId(), this.getType());
+  }
+
+  static TaskInfo<TaskMetadata, TaskStatus> toTaskMetadataInfo(TaskInfo<Task, 
TaskStatus> taskInfo)
+  {
+    return new TaskInfo<>(
+        taskInfo.getId(),
+        taskInfo.getCreatedTime(),
+        taskInfo.getStatus(),
+        taskInfo.getDataSource(),
+        taskInfo.getTask().getMetadata()
+    );
+  }

Review Comment:
   TODO: not very comfortable with this.



##########
core/src/main/java/org/apache/druid/metadata/MetadataStorageConnector.java:
##########
@@ -87,5 +87,7 @@ default void exportTable(
 
   void createSupervisorsTable();
 
+  String getTaskTableName();

Review Comment:
   Shouldn't be required.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -327,6 +331,29 @@ tableName, getPayloadType()
         )
     );
   }
+  
+  public boolean tableContainsColumn(Handle handle, String table, String 
column)

Review Comment:
   Does this have to be public?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java:
##########
@@ -150,6 +151,21 @@
    */
   List<Task> getActiveTasksByDatasource(String datasource);
 
+  /**
+   * Returns tasks stored in the storage facility as a List of TaskStatusPlus
+   * particular order is guaranteed, but implementations are encouraged to 
return tasks in ascending order of creation.

Review Comment:
   ```suggestion
      * Returns the status of tasks in the metadata storage.
      * No particular order is guaranteed, but implementations are encouraged 
to return tasks in ascending order of creation.
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java:
##########
@@ -233,6 +234,18 @@ public List<TaskInfo<Task, TaskStatus>> getTaskInfos(
     return tasks;
   }
 
+  @Override
+  public List<TaskStatusPlus> getTaskStatusPlusList(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String datasource
+  )
+  {
+    return getTaskInfos(taskLookups, datasource).stream()
+                                                .map(Task::toTaskMetadataInfo)

Review Comment:
   It seems this would be a compilation error because `getTaskInfos` returns a 
List of `TaskInfo` objects and this mapper is treating it as a `Task`.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -327,6 +331,29 @@ tableName, getPayloadType()
         )
     );
   }
+  
+  public boolean tableContainsColumn(Handle handle, String table, String 
column)
+  {
+    try {
+      DatabaseMetaData databaseMetaData = handle.getConnection().getMetaData();
+      ResultSet columns = databaseMetaData.getColumns(
+          null,
+          null,
+          table,
+          column
+      );
+      return columns.next();
+    }
+    catch (SQLException e) {
+      return false;
+    }
+  }
+  
+  public void prepareTaskEntryTable(final String tableName)

Review Comment:
   From an API perspective, it would be cleaner to just call `createEntryTable` 
which internally calls `alterEntryTable`. The consumers of this connector need 
not know if an alter is required or not.
   



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +377,43 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()

Review Comment:
   Nit: use lambda instead as that is being used in other places in this class 
itself.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +377,43 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              final Batch batch = handle.createBatch();
+              if (!tableContainsColumn(handle, tableName, "type")) {
+                log.info("Adding column: type to table[%s]", tableName);
+                batch.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN type 
VARCHAR(255)", tableName));
+              }
+              if (!tableContainsColumn(handle, tableName, "group_id")) {
+                log.info("Adding column: group_id to table[%s]", tableName);
+                batch.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN 
group_id VARCHAR(255)", tableName));
+              }
+              batch.execute();
+              return null;
+            }
+          }
+      );
+    }
+    catch (Exception e) {
+      log.warn(e, "Exception altering table");
+    }
+  }
+
+  @Override
+  public String getTaskTableName()

Review Comment:
   Shouldn't be needed.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java:
##########
@@ -745,87 +745,57 @@ private List<TaskStatusPlus> getTasks(
 
     if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) {
       // We are interested in only those tasks which are in taskRunner.
-      taskInfoStreamFromTaskStorage = taskInfoStreamFromTaskStorage
-          .filter(info -> runnerWorkItems.containsKey(info.getId()));
+      taskStatusPlusStream = taskStatusPlusStream
+          .filter(statusPlus -> 
runnerWorkItems.containsKey(statusPlus.getId()));
     }
-    final List<TaskInfo<Task, TaskStatus>> taskInfoFromTaskStorage = 
taskInfoStreamFromTaskStorage
-        .collect(Collectors.toList());
+    final List<TaskStatusPlus> taskStatusPlusList = 
taskStatusPlusStream.collect(Collectors.toList());
 
     // Separate complete and active tasks from taskStorage.
     // Note that taskStorage can return only either complete tasks or active 
tasks per TaskLookupType.
-    final List<TaskInfo<Task, TaskStatus>> completeTaskInfoFromTaskStorage = 
new ArrayList<>();
-    final List<TaskInfo<Task, TaskStatus>> activeTaskInfoFromTaskStorage = new 
ArrayList<>();
-    for (TaskInfo<Task, TaskStatus> info : taskInfoFromTaskStorage) {
-      if (info.getStatus().isComplete()) {
-        completeTaskInfoFromTaskStorage.add(info);
+    final List<TaskStatusPlus> completeTaskStatusPlusList = new ArrayList<>();
+    final List<TaskStatusPlus> activeTaskStatusPlusList = new ArrayList<>();
+    for (TaskStatusPlus statusPlus : taskStatusPlusList) {
+      if (statusPlus.getStatusCode().isComplete()) {
+        completeTaskStatusPlusList.add(statusPlus);
       } else {
-        activeTaskInfoFromTaskStorage.add(info);
+        activeTaskStatusPlusList.add(statusPlus);
       }
     }
 
-    final List<TaskStatusPlus> statuses = new ArrayList<>();
-    completeTaskInfoFromTaskStorage.forEach(taskInfo -> statuses.add(
-        new TaskStatusPlus(
-            taskInfo.getId(),
-            taskInfo.getTask() == null ? null : 
taskInfo.getTask().getGroupId(),
-            taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
-            taskInfo.getCreatedTime(),
-            DateTimes.EPOCH,
-            taskInfo.getStatus().getStatusCode(),
-            RunnerTaskState.NONE,
-            taskInfo.getStatus().getDuration(),
-            taskInfo.getStatus().getLocation(),
-            taskInfo.getDataSource(),
-            taskInfo.getStatus().getErrorMsg()
-        )
-    ));
+    final List<TaskStatusPlus> taskStatuses = new 
ArrayList<>(completeTaskStatusPlusList);
 
-    activeTaskInfoFromTaskStorage.forEach(taskInfo -> {
-      final TaskRunnerWorkItem runnerWorkItem = 
runnerWorkItems.get(taskInfo.getId());
+    activeTaskStatusPlusList.forEach(statusPlus -> {
+      final TaskRunnerWorkItem runnerWorkItem = 
runnerWorkItems.get(statusPlus.getId());
       if (runnerWorkItem == null) {
         // a task is assumed to be a waiting task if it exists in taskStorage 
but not in taskRunner.
         if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) {
-          statuses.add(
-              new TaskStatusPlus(
-                  taskInfo.getId(),
-                  taskInfo.getTask() == null ? null : 
taskInfo.getTask().getGroupId(),
-                  taskInfo.getTask() == null ? null : 
taskInfo.getTask().getType(),
-                  taskInfo.getCreatedTime(),
-                  DateTimes.EPOCH,
-                  taskInfo.getStatus().getStatusCode(),
-                  RunnerTaskState.WAITING,
-                  taskInfo.getStatus().getDuration(),
-                  taskInfo.getStatus().getLocation(),
-                  taskInfo.getDataSource(),
-                  taskInfo.getStatus().getErrorMsg()
-              )
-          );
+          taskStatuses.add(statusPlus);
         }
       } else {
         if (state == TaskStateLookup.PENDING || state == 
TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) {
-          statuses.add(
+          taskStatuses.add(
               new TaskStatusPlus(
-                  taskInfo.getId(),
-                  taskInfo.getTask() == null ? null : 
taskInfo.getTask().getGroupId(),
-                  taskInfo.getTask() == null ? null : 
taskInfo.getTask().getType(),
+                  statusPlus.getId(),
+                  statusPlus.getGroupId(),
+                  statusPlus.getType(),
                   runnerWorkItem.getCreatedTime(),
                   runnerWorkItem.getQueueInsertionTime(),
-                  taskInfo.getStatus().getStatusCode(),
-                  taskRunner.getRunnerTaskState(taskInfo.getId()), // this is 
racy for remoteTaskRunner
-                  taskInfo.getStatus().getDuration(),
+                  statusPlus.getStatusCode(),
+                  taskRunner.getRunnerTaskState(statusPlus.getId()), // this 
is racy for remoteTaskRunner
+                  statusPlus.getDuration(),
                   runnerWorkItem.getLocation(), // location in taskInfo is 
only updated after the task is done.
-                  taskInfo.getDataSource(),
-                  taskInfo.getStatus().getErrorMsg()
+                  statusPlus.getDataSource(),
+                  statusPlus.getErrorMsg()
               )
           );
         }
       }
     });
 
-    return statuses;
+    return taskStatuses;
   }
 
-  private Stream<TaskInfo<Task, TaskStatus>> getTaskInfoStreamFromTaskStorage(
+  private Stream<TaskStatusPlus> getTaskStatusPlusList(

Review Comment:
   This should just return a List and all the filtering should happen in the 
calling method.



##########
core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java:
##########
@@ -252,4 +254,29 @@ public String toString()
            ", errorMsg='" + errorMsg + '\'' +
            '}';
   }
+
+  /**
+   * Convert a TaskInfo pair of TaskMetadata and TaskStatus to a TaskStatusPlus
+   * Applicable only for completed or waiting tasks

Review Comment:
   This javadoc is probably not needed as this is just a conversion method. But 
if you must add it, please elaborate this part to clarify why it only applies 
to completed or waiting tasks.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java:
##########
@@ -311,7 +325,126 @@ public List<TaskInfo<EntryType, StatusType>> getTaskInfos(
     );
   }
 
-  protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
+  @Override
+  public List<TaskInfo<TaskMetadata, StatusType>> 
getTaskMetadataInfosFromPayload(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource
+  )
+  {
+    return getTaskMetadataInfos(taskLookups, dataSource, true);
+  }
+
+  @Override
+  public List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfos(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource
+  )
+  {
+    return getTaskMetadataInfos(taskLookups, dataSource, false);
+  }
+
+  public List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfos(
+      Map<TaskLookupType, TaskLookup> taskLookups,
+      @Nullable String dataSource,
+      boolean fetchPayload
+  )
+  {
+    ResultSetMapper<TaskInfo<TaskMetadata, StatusType>> resultSetMapper =
+        fetchPayload ? taskMetadataInfoMapperFromPayload : 
taskMetadataInfoMapper;
+    return getConnector().retryTransaction(
+        (handle, status) -> {
+          final List<TaskInfo<TaskMetadata, StatusType>> taskMetadataInfos = 
new ArrayList<>();
+          for (Entry<TaskLookupType, TaskLookup> entry : 
taskLookups.entrySet()) {
+            final Query<Map<String, Object>> query;
+            switch (entry.getKey()) {
+              case ACTIVE:
+                query = !fetchPayload
+                        ? createActiveTaskSummaryStreamingQuery(handle, 
dataSource)
+                        : createActiveTaskStreamingQuery(handle, dataSource);
+                taskMetadataInfos.addAll(query.map(resultSetMapper).list());
+                break;
+              case COMPLETE:
+                CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) 
entry.getValue();
+                DateTime priorTo = completeTaskLookup.getTasksCreatedPriorTo();
+                Integer limit = completeTaskLookup.getMaxTaskStatuses();
+                query = !fetchPayload

Review Comment:
   Nit: use condition on `fetchPayload` rather than `!fetchPayload`



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +377,43 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)

Review Comment:
   Should not be public.
   
   Maybe rename to something like `checkAndAddEntryTableColumns` which reflects 
that the alter is optional and that new columns are being added.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -350,6 +381,87 @@ tableName, getPayloadType(), getCollation()
     );
   }
 
+  public void alterEntryTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          new HandleCallback<Void>()
+          {
+            @Override
+            public Void withHandle(Handle handle)
+            {
+              if (!tableContainsColumn(handle, tableName, "type")) {

Review Comment:
   I am not sure if this is done yet. The code still seems to be checking 
`tableContainsColumn()` twice, once for group_id and once for type. They will 
either both be there or neither.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to