This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e29aa12ef9 REALTIME Table add purge feature (#9568)
e29aa12ef9 is described below
commit e29aa12ef96a267a533856ec01300335ef7c46f8
Author: Airliquide76 <[email protected]>
AuthorDate: Tue Jan 17 22:20:14 2023 +0100
REALTIME Table add purge feature (#9568)
---
.../minion/tasks/purge/PurgeTaskGenerator.java | 35 +++++++++++++---------
1 file changed, 21 insertions(+), 14 deletions(-)
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
index ff73ff7197..c5cbe4de6e 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
@@ -36,6 +36,7 @@ import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,11 +60,6 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
for (TableConfig tableConfig : tableConfigs) {
String tableName = tableConfig.getTableName();
- if (tableConfig.getTableType() == TableType.REALTIME) {
- LOGGER.warn("Skip generating task: {} for real-time table: {}",
taskType, tableName);
- continue;
- }
-
Map<String, String> taskConfigs;
TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
if (tableTaskConfig == null) {
@@ -92,18 +88,30 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
} else {
tableMaxNumTasks = Integer.MAX_VALUE;
}
- List<SegmentZKMetadata> offlineSegmentsZKMetadata =
_clusterInfoAccessor.getSegmentsZKMetadata(tableName);
+ List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
+ if (tableConfig.getTableType() == TableType.REALTIME) {
+ List<SegmentZKMetadata> segmentsZKMetadataAll =
_clusterInfoAccessor.getSegmentsZKMetadata(tableName);
+ for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadataAll) {
+ CommonConstants.Segment.Realtime.Status status =
segmentZKMetadata.getStatus();
+ if (status.isCompleted()) {
+ segmentsZKMetadata.add(segmentZKMetadata);
+ }
+ }
+ } else {
+ segmentsZKMetadata =
_clusterInfoAccessor.getSegmentsZKMetadata(tableName);
+ }
+
List<SegmentZKMetadata> purgedSegmentsZKMetadata = new ArrayList<>();
List<SegmentZKMetadata> notpurgedSegmentsZKMetadata = new ArrayList<>();
- for (SegmentZKMetadata segmentMetadata: offlineSegmentsZKMetadata) {
+ for (SegmentZKMetadata segmentMetadata : segmentsZKMetadata) {
- if (segmentMetadata.getCustomMap() != null &&
segmentMetadata.getCustomMap().containsKey(
- MinionConstants.PurgeTask.TASK_TYPE +
MinionConstants.TASK_TIME_SUFFIX)) {
- purgedSegmentsZKMetadata.add(segmentMetadata);
- } else {
- notpurgedSegmentsZKMetadata.add(segmentMetadata);
- }
+ if (segmentMetadata.getCustomMap() != null &&
segmentMetadata.getCustomMap()
+ .containsKey(MinionConstants.PurgeTask.TASK_TYPE +
MinionConstants.TASK_TIME_SUFFIX)) {
+ purgedSegmentsZKMetadata.add(segmentMetadata);
+ } else {
+ notpurgedSegmentsZKMetadata.add(segmentMetadata);
+ }
}
Collections.sort(purgedSegmentsZKMetadata, Comparator.comparing(
segmentZKMetadata -> segmentZKMetadata.getCustomMap()
@@ -111,7 +119,6 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
Comparator.nullsFirst(Comparator.naturalOrder())));
//add already purged segment at the end
notpurgedSegmentsZKMetadata.addAll(purgedSegmentsZKMetadata);
-
int tableNumTasks = 0;
Set<Segment> runningSegments =
TaskGeneratorUtils.getRunningSegments(MinionConstants.PurgeTask.TASK_TYPE,
_clusterInfoAccessor);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]