This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bad572ecb6 [MergeRollupTask] include partition info into segment name
(#9815)
bad572ecb6 is described below
commit bad572ecb636d766862b048be47c9b6bfa8ab4ef
Author: Haitao Zhang <[email protected]>
AuthorDate: Thu Nov 17 18:11:23 2022 -0800
[MergeRollupTask] include partition info into segment name (#9815)
---
.../mergerollup/MergeRollupTaskGenerator.java | 32 ++++++++++++++++------
1 file changed, 24 insertions(+), 8 deletions(-)
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 0983e9f4cd..28043c2f60 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -105,6 +105,7 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;
private static final int DEFAULT_NUM_PARALLEL_BUCKETS = 1;
private static final String REFRESH = "REFRESH";
+ private static final String DELIMITER_IN_SEGMENT_NAME = "_";
// This is the metric that keeps track of the task delay in the number of
time buckets. For example, if we see this
// number to be 7 and merge task is configured with "bucketTimePeriod = 1d",
this means that we have 7 days of
@@ -349,7 +350,7 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
for (List<SegmentZKMetadata> selectedSegmentsPerBucket :
selectedSegmentsForAllBuckets) {
pinotTaskConfigsForTable.addAll(
createPinotTaskConfigs(selectedSegmentsPerBucket,
offlineTableName, maxNumRecordsPerTask, mergeLevel,
- mergeConfigs, taskConfigs));
+ null, mergeConfigs, taskConfigs));
}
} else {
// For partitioned table, schedule separate tasks for each
partitionId (partitionId is constructed from
@@ -383,16 +384,18 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
}
}
- for (List<SegmentZKMetadata> partitionedSegments :
partitionToSegments.values()) {
+ for (Map.Entry<List<Integer>, List<SegmentZKMetadata>> entry :
partitionToSegments.entrySet()) {
+ List<Integer> partition = entry.getKey();
+ List<SegmentZKMetadata> partitionedSegments = entry.getValue();
pinotTaskConfigsForTable.addAll(
createPinotTaskConfigs(partitionedSegments,
offlineTableName, maxNumRecordsPerTask, mergeLevel,
- mergeConfigs, taskConfigs));
+ partition, mergeConfigs, taskConfigs));
}
if (!outlierSegments.isEmpty()) {
pinotTaskConfigsForTable.addAll(
createPinotTaskConfigs(outlierSegments, offlineTableName,
maxNumRecordsPerTask, mergeLevel,
- mergeConfigs, taskConfigs));
+ null, mergeConfigs, taskConfigs));
}
}
}
@@ -516,8 +519,8 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
* Create pinot task configs with selected segments and configs
*/
private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata>
selectedSegments,
- String offlineTableName, int maxNumRecordsPerTask, String mergeLevel,
Map<String, String> mergeConfigs,
- Map<String, String> taskConfigs) {
+ String offlineTableName, int maxNumRecordsPerTask, String mergeLevel,
List<Integer> partition,
+ Map<String, String> mergeConfigs, Map<String, String> taskConfigs) {
int numRecordsPerTask = 0;
List<List<String>> segmentNamesList = new ArrayList<>();
List<List<String>> downloadURLsList = new ArrayList<>();
@@ -539,6 +542,15 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
}
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+ StringBuilder partitionSuffixBuilder = new StringBuilder();
+ if (partition != null && !partition.isEmpty()) {
+ for (int columnPartition : partition) {
+
partitionSuffixBuilder.append(DELIMITER_IN_SEGMENT_NAME).append(columnPartition);
+ }
+ }
+ String partitionSuffix = partitionSuffixBuilder.toString();
+
for (int i = 0; i < segmentNamesList.size(); i++) {
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
@@ -562,9 +574,13 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
configs.put(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
mergeConfigs.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY));
+ // Segment name conflict happens when the current method
"createPinotTaskConfigs" is invoked more than once within
+ // the same epoch millisecond, which may happen when there are multiple
partitions.
+ // To prevent such name conflict, we include a partitionSeqSuffix to the
segment name.
configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY,
- MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + "_" +
System.currentTimeMillis() + "_" + i + "_"
- + TableNameBuilder.extractRawTableName(offlineTableName));
+ MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel +
DELIMITER_IN_SEGMENT_NAME
+ + System.currentTimeMillis() + partitionSuffix +
DELIMITER_IN_SEGMENT_NAME + i
+ + DELIMITER_IN_SEGMENT_NAME +
TableNameBuilder.extractRawTableName(offlineTableName));
pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE,
configs));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]