hongkunxu commented on code in PR #18242: URL: https://github.com/apache/pinot/pull/18242#discussion_r3128081297
########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/materializedview/MaterializedViewTaskGenerator.java: ########## @@ -0,0 +1,594 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.materializedview; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.helix.store.HelixPropertyStore; +import org.apache.helix.task.TaskState; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.minion.MvDefinitionMetadata; +import org.apache.pinot.common.minion.MvDefinitionMetadataUtils; +import org.apache.pinot.common.minion.MvFreshness; +import org.apache.pinot.common.minion.MvRuntimeMetadata; +import org.apache.pinot.common.minion.MvRuntimeMetadataUtils; +import org.apache.pinot.common.minion.PartitionFingerprint; +import org.apache.pinot.common.minion.PartitionInfo; +import org.apache.pinot.common.minion.PartitionState; +import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; +import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.common.MinionConstants.MaterializedViewTask; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.annotations.minion.TaskGenerator; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.TimeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Task generator for {@link MaterializedViewTask}. + * + * <p>Unlike segment-conversion tasks, this generator does not scan source segments. It only + * computes a time window and appends it to the user-defined SQL, producing a + * {@link PinotTaskConfig} for the executor. + * + * <p>Two-step decision logic (evaluated per table, per schedule cycle): + * <ol> + * <li><b>Overwrite STALE</b> – If any partition is marked {@link PartitionState#STALE} + * (by the event-driven {@code MaterializedViewConsistencyManager}), the generator + * performs a precise fingerprint verification. If the data truly changed, it generates + * an {@code OVERWRITE} task for the earliest stale partition. If the fingerprint + * matches (false positive), the partition is reverted to {@link PartitionState#VALID}. + * This step has the highest priority to maintain consistency.</li> + * <li><b>Append</b> – If no STALE partitions exist and the watermark can advance (next + * window is outside the buffer period), generate a normal {@code APPEND} task.</li> + * </ol> + * + * <p>Dirty marking (STALE detection) is handled externally by + * {@code MaterializedViewConsistencyManager}, which reacts to base table segment changes + * (add, replace, delete) and proactively marks affected partitions in + * {@link MvRuntimeMetadata}. + */ +@TaskGenerator +public class MaterializedViewTaskGenerator extends BaseTaskGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(MaterializedViewTaskGenerator.class); + + private static final String DEFAULT_BUCKET_PERIOD = "1d"; + + @Override + public String getTaskType() { + return MaterializedViewTask.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + String taskType = MaterializedViewTask.TASK_TYPE; + List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + + for (TableConfig tableConfig : tableConfigs) { + String offlineTableName = tableConfig.getTableName(); + + if (tableConfig.getTableType() != TableType.OFFLINE) { + LOGGER.warn("Skip generating task: {} for non-OFFLINE table: {}", taskType, offlineTableName); + continue; + } + LOGGER.info("Start generating task configs for table: {} for task: {}", offlineTableName, taskType); + + // Only schedule 1 task of this type per table + Map<String, TaskState> incompleteTasks = + TaskGeneratorUtils.getIncompleteTasks(taskType, offlineTableName, _clusterInfoAccessor); + if (!incompleteTasks.isEmpty()) { + LOGGER.warn("Found incomplete tasks: {} for table: {} and task type: {}. Skipping.", + incompleteTasks.keySet(), offlineTableName, taskType); + continue; + } + + TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig(); + Preconditions.checkState(tableTaskConfig != null); + Map<String, String> taskConfigs = tableTaskConfig.getConfigsForTaskType(taskType); + Preconditions.checkState(taskConfigs != null, "Task config shouldn't be null for table: %s", offlineTableName); + + String definedSQL = taskConfigs.get(MaterializedViewTask.DEFINED_SQL_KEY); + Preconditions.checkState(definedSQL != null && !definedSQL.isEmpty(), + "definedSQL must be specified for table: %s", offlineTableName); + + String sourceTableName = MaterializedViewAnalyzer.extractSourceTableName(definedSQL); + String sourceTableWithType = resolveSourceTableNameWithType(sourceTableName); + + // Bucket and buffer + String bucketTimePeriod = + taskConfigs.getOrDefault(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY, DEFAULT_BUCKET_PERIOD); + long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod); + String bufferTimePeriod = + taskConfigs.getOrDefault(MaterializedViewTask.BUFFER_TIME_PERIOD_KEY, "0d"); + long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod); + + // Load upperExclusiveMs (watermark) and partitionInfos from MvRuntimeMetadata + String mvTableWithType = TableNameBuilder.OFFLINE.tableNameWithType(offlineTableName); + HelixPropertyStore<ZNRecord> propertyStore = + _clusterInfoAccessor.getPinotHelixResourceManager().getPropertyStore(); + long watermarkMs = getWatermarkMs(offlineTableName, sourceTableName, bucketMs, definedSQL); + + Stat rtStat = new Stat(); + MvRuntimeMetadata mvRuntime = MvRuntimeMetadataUtils.fetchWithVersion( + propertyStore, mvTableWithType, rtStat); + Map<Long, PartitionInfo> partitionInfos = new HashMap<>(); + int runtimeVersion = -1; + if (mvRuntime != null) { + partitionInfos = new HashMap<>(mvRuntime.getPartitions()); + runtimeVersion = rtStat.getVersion(); + } + + // ── Step 1a: Delete EXPIRED partitions (highest priority) ── + PinotTaskConfig deleteTask = tryGenerateDeleteTask(offlineTableName, taskConfigs, + partitionInfos, bucketMs); + if (deleteTask != null) { + pinotTaskConfigs.add(deleteTask); + LOGGER.info("Generated DELETE task for table: {}", offlineTableName); + continue; + } + + // ── Step 1b: Overwrite STALE partitions (with precise fingerprint verification) ── + PinotTaskConfig overwriteTask = tryGenerateOverwriteTask(offlineTableName, sourceTableName, + sourceTableWithType, definedSQL, taskConfigs, partitionInfos, bucketMs, + mvRuntime, runtimeVersion); + if (overwriteTask != null) { + pinotTaskConfigs.add(overwriteTask); + LOGGER.info("Generated OVERWRITE task for table: {}", offlineTableName); + continue; + } + + // ── Step 2: Append new data (advance watermark) ── + long windowStartMs = watermarkMs; + long windowEndMs = windowStartMs + bucketMs; + + if (windowEndMs <= System.currentTimeMillis() - bufferMs) { + PinotTaskConfig appendTask = buildTaskConfig(offlineTableName, sourceTableName, + sourceTableWithType, definedSQL, taskConfigs, windowStartMs, windowEndMs, + MaterializedViewTask.TASK_MODE_APPEND); + pinotTaskConfigs.add(appendTask); + LOGGER.info("Generated APPEND task for table: {} window [{}, {})", offlineTableName, + windowStartMs, windowEndMs); + continue; + } + + LOGGER.debug("MV table {} is caught up (watermark={}), no dirty partitions.", offlineTableName, watermarkMs); + } + return pinotTaskConfigs; + } + + /** + * Step 1a: Finds the earliest EXPIRED partition and generates a DELETE task for it. + * DELETE tasks only remove MV segments; no query is executed. + * + * @return a {@link PinotTaskConfig} for delete, or {@code null} if no EXPIRED partitions exist + */ + private PinotTaskConfig tryGenerateDeleteTask(String mvTableName, Map<String, String> taskConfigs, + Map<Long, PartitionInfo> partitionInfos, long bucketMs) { + long earliestExpiredMs = Long.MAX_VALUE; + for (Map.Entry<Long, PartitionInfo> entry : partitionInfos.entrySet()) { + if (entry.getValue().getState() == PartitionState.EXPIRED && entry.getKey() < earliestExpiredMs) { + earliestExpiredMs = entry.getKey(); + } + } + if (earliestExpiredMs == Long.MAX_VALUE) { + return null; + } + long windowStartMs = earliestExpiredMs; + long windowEndMs = windowStartMs + bucketMs; + LOGGER.info("Found EXPIRED partition at {} for table: {}. Generating DELETE task for window [{}, {})", + windowStartMs, mvTableName, windowStartMs, windowEndMs); + + Map<String, String> configs = new HashMap<>(); + configs.put(MinionConstants.TABLE_NAME_KEY, mvTableName); + configs.put(MaterializedViewTask.WINDOW_START_MS_KEY, String.valueOf(windowStartMs)); + configs.put(MaterializedViewTask.WINDOW_END_MS_KEY, String.valueOf(windowEndMs)); + configs.put(MaterializedViewTask.TASK_MODE_KEY, MaterializedViewTask.TASK_MODE_DELETE); + configs.put(MinionConstants.UPLOAD_URL_KEY, + _clusterInfoAccessor.getVipUrl() + "/segments"); + + return new PinotTaskConfig(MaterializedViewTask.TASK_TYPE, configs); + } + + /** + * Step 1b: Finds the earliest STALE partition and generates an OVERWRITE task for it. + * + * <p>Before generating the task, re-computes the fingerprint for that partition to handle: + * <ul> + * <li>STALE → EXPIRED promotion (all base segments deleted since marking)</li> + * <li>False positive recovery (fingerprint matches stored value → revert to VALID)</li> + * </ul> + * + * @return a {@link PinotTaskConfig} for overwrite, or {@code null} if no actionable STALE + * partitions exist + */ + private PinotTaskConfig tryGenerateOverwriteTask(String mvTableName, String sourceTableName, + String sourceTableWithType, String definedSQL, Map<String, String> taskConfigs, + Map<Long, PartitionInfo> partitionInfos, long bucketMs, + MvRuntimeMetadata mvRuntime, int runtimeVersion) { + long earliestStaleMs = Long.MAX_VALUE; + for (Map.Entry<Long, PartitionInfo> entry : partitionInfos.entrySet()) { + if (entry.getValue().getState() == PartitionState.STALE && entry.getKey() < earliestStaleMs) { + earliestStaleMs = entry.getKey(); + } + } + if (earliestStaleMs == Long.MAX_VALUE) { + return null; + } + + long windowStartMs = earliestStaleMs; + long windowEndMs = windowStartMs + bucketMs; + PartitionInfo staleInfo = partitionInfos.get(earliestStaleMs); + + PartitionFingerprint currentFp = computeWindowFingerprint(sourceTableWithType, windowStartMs, windowEndMs); + + if (currentFp.getSegmentCount() == 0) { + LOGGER.info("STALE partition [{}, {}) base data deleted for table: {}. Promoting to EXPIRED.", + windowStartMs, windowEndMs, mvTableName); + partitionInfos.put(earliestStaleMs, staleInfo.withState(PartitionState.EXPIRED)); + persistUpdatedPartitionInfos(mvTableName, partitionInfos, mvRuntime, runtimeVersion); + return null; + } + + if (currentFp.equals(staleInfo.getFingerprint())) { + LOGGER.info("STALE partition [{}, {}) fingerprint matches for table: {}. " + + "Reverting to VALID (false positive).", windowStartMs, windowEndMs, mvTableName); + partitionInfos.put(earliestStaleMs, staleInfo.withState(PartitionState.VALID)); + persistUpdatedPartitionInfos(mvTableName, partitionInfos, mvRuntime, runtimeVersion); + return null; + } + + LOGGER.info("Confirmed STALE partition at {} for table: {}. Generating OVERWRITE task for window [{}, {})", + windowStartMs, mvTableName, windowStartMs, windowEndMs); + return buildTaskConfig(mvTableName, sourceTableName, sourceTableWithType, definedSQL, + taskConfigs, windowStartMs, windowEndMs, MaterializedViewTask.TASK_MODE_OVERWRITE); + } + + /** + * Persists updated partitionInfos back to MV runtime in ZK (used for state promotions/reversions). + */ + private void persistUpdatedPartitionInfos(String mvTableName, Map<Long, PartitionInfo> partitionInfos, + MvRuntimeMetadata mvRuntime, int runtimeVersion) { + MvFreshness freshness = MvRuntimeMetadata.computeFreshness(partitionInfos); + MvRuntimeMetadata updated = new MvRuntimeMetadata( + mvRuntime.getMvTableNameWithType(), + mvRuntime.getWatermarkMs(), + mvRuntime.getCoverageUpperMs(), + freshness, + partitionInfos); + MvRuntimeMetadataUtils.persist( + _clusterInfoAccessor.getPinotHelixResourceManager().getPropertyStore(), + updated, runtimeVersion); + LOGGER.info("Persisted partition state changes for MV table: {}", mvTableName); + } + + /** + * Builds a complete {@link PinotTaskConfig} for either APPEND or OVERWRITE mode. + */ + private PinotTaskConfig buildTaskConfig(String mvTableName, String sourceTableName, + String sourceTableWithType, String definedSQL, Map<String, String> taskConfigs, + long windowStartMs, long windowEndMs, String taskMode) { + String taskType = MaterializedViewTask.TASK_TYPE; + + PartitionFingerprint windowFingerprint = + computeWindowFingerprint(sourceTableWithType, windowStartMs, windowEndMs); + + String sourceTimeColumn = resolveSourceTimeColumn(sourceTableName); + DateTimeFormatSpec timeFormatSpec = resolveSourceTimeFormatSpec(sourceTableName, sourceTimeColumn); + String windowStart = timeFormatSpec.fromMillisToFormat(windowStartMs); + String windowEnd = timeFormatSpec.fromMillisToFormat(windowEndMs); + String sqlWithTimeRange = appendTimeRange(definedSQL, sourceTimeColumn, windowStart, windowEnd); + sqlWithTimeRange = ensureLimit(sqlWithTimeRange, MaterializedViewTask.DEFAULT_MV_QUERY_LIMIT); + + Map<String, String> configs = new HashMap<>(); + configs.put(MinionConstants.TABLE_NAME_KEY, mvTableName); + configs.put(MaterializedViewTask.DEFINED_SQL_KEY, sqlWithTimeRange); + configs.put(MaterializedViewTask.ORIGINAL_DEFINED_SQL_KEY, definedSQL); + configs.put(MaterializedViewTask.WINDOW_START_MS_KEY, String.valueOf(windowStartMs)); + configs.put(MaterializedViewTask.WINDOW_END_MS_KEY, String.valueOf(windowEndMs)); + configs.put(MaterializedViewTask.SOURCE_TABLE_NAME_KEY, sourceTableName); + configs.put(MaterializedViewTask.TASK_MODE_KEY, taskMode); + configs.put(MinionConstants.UPLOAD_URL_KEY, + _clusterInfoAccessor.getVipUrl() + "/segments"); + + String maxNumRecords = taskConfigs.get(MaterializedViewTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY); + if (maxNumRecords != null) { + configs.put(MaterializedViewTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, maxNumRecords); + } + + Map<Long, PartitionFingerprint> fingerprintMap = new HashMap<>(); + fingerprintMap.put(windowStartMs, windowFingerprint); + configs.put(MaterializedViewTask.PARTITION_FINGERPRINTS_KEY, + PartitionFingerprint.encodeMap(fingerprintMap)); + + return new PinotTaskConfig(taskType, configs); + } + + @Override + public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<String, String> taskConfigs) { + MaterializedViewAnalyzer.analyze( + taskConfigs.get(MaterializedViewTask.DEFINED_SQL_KEY), + tableConfig, schema, taskConfigs, _clusterInfoAccessor); + } + + /** + * Resolves the time column for the source table by looking up its TableConfig. + */ + private String resolveSourceTimeColumn(String rawSourceTableName) { + // Try OFFLINE first, then REALTIME + String sourceTableWithType = TableNameBuilder.OFFLINE.tableNameWithType(rawSourceTableName); + TableConfig sourceTableConfig = _clusterInfoAccessor.getTableConfig(sourceTableWithType); + if (sourceTableConfig == null) { + sourceTableWithType = TableNameBuilder.REALTIME.tableNameWithType(rawSourceTableName); + sourceTableConfig = _clusterInfoAccessor.getTableConfig(sourceTableWithType); + } + Preconditions.checkState(sourceTableConfig != null, + "Source table config not found for: %s", rawSourceTableName); + + String timeColumn = sourceTableConfig.getValidationConfig().getTimeColumnName(); + Preconditions.checkState(timeColumn != null && !timeColumn.isEmpty(), + "Time column not configured for source table: %s", rawSourceTableName); + return timeColumn; + } + + /** + * Resolves the {@link DateTimeFormatSpec} for the source table's time column by looking up + * the table schema. This spec is used to convert millisecond-based watermarks to the + * time column's native format (e.g. days since epoch for {@code 1:DAYS:EPOCH}). + */ + private DateTimeFormatSpec resolveSourceTimeFormatSpec(String rawSourceTableName, String timeColumn) { + String sourceTableWithType = resolveSourceTableNameWithType(rawSourceTableName); + Schema sourceSchema = _clusterInfoAccessor.getTableSchema(sourceTableWithType); + Preconditions.checkState(sourceSchema != null, + "Schema not found for source table: %s", rawSourceTableName); + + DateTimeFieldSpec fieldSpec = sourceSchema.getSpecForTimeColumn(timeColumn); + Preconditions.checkState(fieldSpec != null, + "No DateTimeFieldSpec found for time column '%s' in source table: %s", timeColumn, rawSourceTableName); + return fieldSpec.getFormatSpec(); + } + + /** + * Resolves the raw format string (e.g. {@code "1:MILLISECONDS:EPOCH"}) for the source + * table's time column, for persisting in {@link MvDefinitionMetadata.MvSplitSpec}. + */ + private String resolveSourceTimeFormat(String rawSourceTableName, String timeColumn) { + String sourceTableWithType = resolveSourceTableNameWithType(rawSourceTableName); + Schema sourceSchema = _clusterInfoAccessor.getTableSchema(sourceTableWithType); + Preconditions.checkState(sourceSchema != null, + "Schema not found for source table: %s", rawSourceTableName); + + DateTimeFieldSpec fieldSpec = sourceSchema.getSpecForTimeColumn(timeColumn); + Preconditions.checkState(fieldSpec != null, + "No DateTimeFieldSpec found for time column '%s' in source table: %s", timeColumn, rawSourceTableName); + return fieldSpec.getFormat(); + } + + /** + * Appends a time-range WHERE clause to the SQL. The window values must already be in the + * time column's native format (e.g. days since epoch, not milliseconds). If a WHERE clause + * already exists, appends with AND; otherwise inserts before GROUP BY / ORDER BY / the + * trailing semicolon. + */ + static String appendTimeRange(String sql, String timeColumn, String windowStart, String windowEnd) { + // Validate column name to prevent SQL injection (column names must be simple identifiers). + Preconditions.checkArgument(timeColumn.matches("[A-Za-z_][A-Za-z0-9_.]*"), + "Time column name contains invalid characters: %s", timeColumn); + // windowStart/windowEnd values come from DateTimeFormatSpec.fromMillisToFormat which may produce + // epoch-numeric strings (safe as-is) or date strings (must be quoted so the SQL parser treats + // them as string literals rather than unquoted expressions that could be misinterpreted). + String quotedStart = isNumeric(windowStart) ? windowStart : "'" + windowStart + "'"; + String quotedEnd = isNumeric(windowEnd) ? windowEnd : "'" + windowEnd + "'"; + String timeFilter = timeColumn + " >= " + quotedStart + " AND " + timeColumn + " < " + quotedEnd; + + // Remove trailing semicolon for easier manipulation + String trimmed = sql.trim(); + if (trimmed.endsWith(";")) { + trimmed = trimmed.substring(0, trimmed.length() - 1).trim(); + } + + String upperSql = trimmed.toUpperCase(); + int whereIdx = upperSql.indexOf(" WHERE "); + if (whereIdx >= 0) { + // Find the end of the existing WHERE conditions (before GROUP BY, ORDER BY, LIMIT, or end) + int insertPos = findClauseEnd(upperSql, whereIdx + 7); + return trimmed.substring(0, insertPos) + " AND " + timeFilter + trimmed.substring(insertPos); + } + + // No WHERE — insert before GROUP BY / ORDER BY / LIMIT / HAVING / end + int insertPos = findClauseEnd(upperSql, upperSql.indexOf(" FROM ") + 6); + // Move past the table name to find where to insert + insertPos = findClauseEnd(upperSql, insertPos); + return trimmed.substring(0, insertPos) + " WHERE " + timeFilter + trimmed.substring(insertPos); + } + + /** + * Ensures the SQL contains an explicit LIMIT clause. If the user's SQL already has one, it is + * left unchanged; otherwise {@code defaultLimit} is appended. This prevents the broker from + * applying its own default limit (typically 10) which would silently truncate MV results. + */ + static String ensureLimit(String sql, int defaultLimit) { + String trimmed = sql.trim(); + if (trimmed.endsWith(";")) { + trimmed = trimmed.substring(0, trimmed.length() - 1).trim(); + } + if (trimmed.toUpperCase().contains(" LIMIT ")) { + return trimmed; + } + return trimmed + " LIMIT " + defaultLimit; Review Comment: fixed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
