This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e29aa12ef9 REALTIME Table add purge feature (#9568)
e29aa12ef9 is described below

commit e29aa12ef96a267a533856ec01300335ef7c46f8
Author: Airliquide76 <[email protected]>
AuthorDate: Tue Jan 17 22:20:14 2023 +0100

    REALTIME Table add purge feature (#9568)
---
 .../minion/tasks/purge/PurgeTaskGenerator.java     | 35 +++++++++++++---------
 1 file changed, 21 insertions(+), 14 deletions(-)

diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
index ff73ff7197..c5cbe4de6e 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
@@ -36,6 +36,7 @@ import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,11 +60,6 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
     for (TableConfig tableConfig : tableConfigs) {
 
       String tableName = tableConfig.getTableName();
-      if (tableConfig.getTableType() == TableType.REALTIME) {
-        LOGGER.warn("Skip generating task: {} for real-time table: {}", 
taskType, tableName);
-        continue;
-      }
-
       Map<String, String> taskConfigs;
       TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
       if (tableTaskConfig == null) {
@@ -92,18 +88,30 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
       } else {
         tableMaxNumTasks = Integer.MAX_VALUE;
       }
-      List<SegmentZKMetadata> offlineSegmentsZKMetadata = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableName);
+      List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
+      if (tableConfig.getTableType() == TableType.REALTIME) {
+        List<SegmentZKMetadata> segmentsZKMetadataAll = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableName);
+        for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadataAll) {
+          CommonConstants.Segment.Realtime.Status status = 
segmentZKMetadata.getStatus();
+          if (status.isCompleted()) {
+            segmentsZKMetadata.add(segmentZKMetadata);
+          }
+        }
+      } else {
+        segmentsZKMetadata = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableName);
+      }
+
       List<SegmentZKMetadata> purgedSegmentsZKMetadata = new ArrayList<>();
       List<SegmentZKMetadata> notpurgedSegmentsZKMetadata = new ArrayList<>();
 
-      for (SegmentZKMetadata segmentMetadata: offlineSegmentsZKMetadata) {
+      for (SegmentZKMetadata segmentMetadata : segmentsZKMetadata) {
 
-       if (segmentMetadata.getCustomMap() != null && 
segmentMetadata.getCustomMap().containsKey(
-           MinionConstants.PurgeTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX)) {
-         purgedSegmentsZKMetadata.add(segmentMetadata);
-       } else {
-         notpurgedSegmentsZKMetadata.add(segmentMetadata);
-       }
+        if (segmentMetadata.getCustomMap() != null && 
segmentMetadata.getCustomMap()
+            .containsKey(MinionConstants.PurgeTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX)) {
+          purgedSegmentsZKMetadata.add(segmentMetadata);
+        } else {
+          notpurgedSegmentsZKMetadata.add(segmentMetadata);
+        }
       }
       Collections.sort(purgedSegmentsZKMetadata, Comparator.comparing(
           segmentZKMetadata -> segmentZKMetadata.getCustomMap()
@@ -111,7 +119,6 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
           Comparator.nullsFirst(Comparator.naturalOrder())));
       //add already purged segment at the end
       notpurgedSegmentsZKMetadata.addAll(purgedSegmentsZKMetadata);
-
       int tableNumTasks = 0;
       Set<Segment> runningSegments =
           
TaskGeneratorUtils.getRunningSegments(MinionConstants.PurgeTask.TASK_TYPE, 
_clusterInfoAccessor);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to