Jackie-Jiang commented on code in PR #17624: URL: https://github.com/apache/pinot/pull/17624#discussion_r2796305953
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/LocalValidDocIdsSnapshotMetadata.java: ########## @@ -0,0 +1,401 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Metadata class to store table configuration information alongside local validDocIds snapshots. + * This is used to check if the local validDocIds snapshots are compatible with the current table + * configuration before using them during server restart/preload. + * + * The metadata file is stored at the table partition level, so that all segments in the same partition + * share the same metadata. The file is named "upsert.snapshot.metadata" and is stored in the table + * index directory. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class LocalValidDocIdsSnapshotMetadata { + private static final Logger LOGGER = LoggerFactory.getLogger(LocalValidDocIdsSnapshotMetadata.class); + + public static final String METADATA_FILE_NAME = "upsert.snapshot.metadata.partition."; + public static final int CURRENT_VERSION = 1; + private int _version = CURRENT_VERSION; + private int _partitionId; + private long _creationTime; + private List<String> _primaryKeyColumns; + private List<String> _comparisonColumns; + private String _deleteRecordColumn; + private HashFunction _hashFunction; + private double _metadataTTL; + private double _deletedKeysTTL; + private UpsertConfig.Mode _upsertMode; + private Map<String, UpsertConfig.Strategy> _partialUpsertStrategies; + private UpsertConfig.Strategy _defaultPartialUpsertStrategy; + private int _numPartitions; + + public LocalValidDocIdsSnapshotMetadata() { + } + + /** + * Creates metadata from the given UpsertContext and TableConfig. + */ + public static LocalValidDocIdsSnapshotMetadata fromUpsertContext(int partitionId, UpsertContext context) { + LocalValidDocIdsSnapshotMetadata metadata = new LocalValidDocIdsSnapshotMetadata(); + metadata.setVersion(CURRENT_VERSION); + metadata.setPartitionId(partitionId); + metadata.setCreationTime(System.currentTimeMillis()); + metadata.setPrimaryKeyColumns(context.getPrimaryKeyColumns()); + metadata.setComparisonColumns(context.getComparisonColumns()); + metadata.setDeleteRecordColumn(context.getDeleteRecordColumn()); + metadata.setHashFunction(context.getHashFunction()); + metadata.setMetadataTTL(context.getMetadataTTL()); + metadata.setDeletedKeysTTL(context.getDeletedKeysTTL()); + + TableConfig tableConfig = context.getTableConfig(); + if (tableConfig != null) { + UpsertConfig.Mode upsertMode = tableConfig.getUpsertMode(); + metadata.setUpsertMode(upsertMode); + if (upsertMode == UpsertConfig.Mode.PARTIAL) { + UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); + if (upsertConfig != null) { + metadata.setPartialUpsertStrategies(upsertConfig.getPartialUpsertStrategies()); + metadata.setDefaultPartialUpsertStrategy(upsertConfig.getDefaultPartialUpsertStrategy()); + } + } + // Get number of partitions from segment partition config + metadata.setNumPartitions(getNumPartitionsFromTableConfig(tableConfig)); + } + + return metadata; + } + + /** + * Gets the number of partitions from the table's segment partition config. + * + * @param tableConfig the table configuration + * @return the number of partitions, or 0 if not configured + */ + private static int getNumPartitionsFromTableConfig(TableConfig tableConfig) { + if (tableConfig.getIndexingConfig() == null) { + return 0; + } + SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig(); + if (segmentPartitionConfig == null || segmentPartitionConfig.getColumnPartitionMap() == null + || segmentPartitionConfig.getColumnPartitionMap().isEmpty()) { + return 0; + } + // For upsert tables, typically there's only one partition column + for (ColumnPartitionConfig columnPartitionConfig : segmentPartitionConfig.getColumnPartitionMap().values()) { + return columnPartitionConfig.getNumPartitions(); + } + return 0; + } + + /** + * Reads the metadata from the given directory for the specified partition. + * + * @param tableIndexDir the table index directory + * @param partitionId the partition ID + * @return the metadata, or null if the file does not exist or cannot be read + */ + @Nullable + public static LocalValidDocIdsSnapshotMetadata fromDirectory(File tableIndexDir, int partitionId) { + File metadataFile = new File(tableIndexDir, METADATA_FILE_NAME + partitionId); + if (!metadataFile.exists()) { + LOGGER.debug("Metadata file {} does not exist", metadataFile.getAbsolutePath()); + return null; + } + try { + return JsonUtils.fileToObject(metadataFile, LocalValidDocIdsSnapshotMetadata.class); + } catch (Exception e) { + LOGGER.warn("Failed to read metadata file {}: {}", metadataFile.getAbsolutePath(), e.getMessage()); + return null; + } + } + + /** + * Persists the metadata to the given directory. + * + * @param tableIndexDir the table index directory + * @throws IOException if the file cannot be written + */ + public void persist(File tableIndexDir) + throws IOException { + File metadataFile = new File(tableIndexDir, METADATA_FILE_NAME + _partitionId); + FileUtils.write(metadataFile, JsonUtils.objectToString(this), StandardCharsets.UTF_8); + LOGGER.debug("Persisted upsert snapshot metadata to {}", metadataFile.getAbsolutePath()); + } + + /** + * Checks if this metadata is compatible with the given UpsertContext. + * Returns true if the snapshots can be safely used for preloading. + * + * @param context the current UpsertContext + * @param tableName the table name (for logging) + * @return true if compatible, false otherwise + */ + @JsonIgnore + public boolean isCompatible(UpsertContext context, String tableName) { + if (!Objects.equals(_primaryKeyColumns, context.getPrimaryKeyColumns())) { + LOGGER.info("Previous snapshot used primary keys: {} different from current: {} for table: {}, partition: {}", + _primaryKeyColumns, context.getPrimaryKeyColumns(), tableName, _partitionId); + return false; + } + if (!Objects.equals(_comparisonColumns, context.getComparisonColumns())) { + LOGGER.info( + "Previous snapshot used comparison columns: {} different from current: {} for table: {}, " + "partition: {}", + _comparisonColumns, context.getComparisonColumns(), tableName, _partitionId); + return false; + } + + if (!StringUtils.equals(_deleteRecordColumn, context.getDeleteRecordColumn())) { + LOGGER.info( + "Previous snapshot used deleteRecordColumn: {} different from current: {} for table: {}, " + "partition: {}", + _deleteRecordColumn, context.getDeleteRecordColumn(), tableName, _partitionId); + return false; + } + if (_hashFunction != context.getHashFunction()) { + LOGGER.info("Previous snapshot used hash function: {} different from current: {} for table: {}, partition: {}", + _hashFunction, context.getHashFunction(), tableName, _partitionId); + return false; + } + if (Double.compare(_metadataTTL, context.getMetadataTTL()) != 0) { + LOGGER.info("Previous snapshot used metadataTTL: {} different from current: {} for table: {}, partition: {}", + _metadataTTL, context.getMetadataTTL(), tableName, _partitionId); + return false; + } + if (Double.compare(_deletedKeysTTL, context.getDeletedKeysTTL()) != 0) { + LOGGER.info("Previous snapshot used deletedKeysTTL: {} different from current: {} for table: {}, partition: {}", + _deletedKeysTTL, context.getDeletedKeysTTL(), tableName, _partitionId); + return false; + } + TableConfig tableConfig = context.getTableConfig(); + if (tableConfig != null) { + UpsertConfig.Mode currentUpsertMode = tableConfig.getUpsertMode(); + if (_upsertMode != null && _upsertMode != currentUpsertMode) { + LOGGER.info("Previous snapshot used upsert mode: {} different from current: {} for table: {}, partition: {}", + _upsertMode, currentUpsertMode, tableName, _partitionId); + return false; + } + + if (currentUpsertMode == UpsertConfig.Mode.PARTIAL) { + UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); + if (upsertConfig != null) { + if (_defaultPartialUpsertStrategy != null + && _defaultPartialUpsertStrategy != upsertConfig.getDefaultPartialUpsertStrategy()) { + LOGGER.info("Previous snapshot used default partial strategy: {} different from current: {} for table: " + + "{}, partition: {}", _defaultPartialUpsertStrategy, + upsertConfig.getDefaultPartialUpsertStrategy(), + tableName, _partitionId); + return false; + } + + if (_partialUpsertStrategies != null && !_partialUpsertStrategies.equals( + upsertConfig.getPartialUpsertStrategies())) { + LOGGER.info("Previous snapshot used partial upsert strategies: {} different from current: {} for table: " + + "{}, partition: {}", _partialUpsertStrategies, upsertConfig.getPartialUpsertStrategies(), + tableName, + _partitionId); + return false; + } + } + } + int currentNumPartitions = getNumPartitionsFromTableConfig(tableConfig); Review Comment: This shouldn't be relevant. All the segments are from the same partition ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java: ########## @@ -1088,6 +1189,46 @@ private void updateSnapshotMetrics(int numImmutableSegments, long numPrimaryKeys } } + /** + * Persists the local validDocIds snapshot metadata to the table index directory. + * This metadata contains the table configuration that was used when taking the snapshots, + * allowing us to check compatibility while preloading using validdoc id snapshot on disk. + * + * This is called AFTER preload completes (not during construction) so that: + * 1. We can first check compatibility with the OLD metadata from previous run + * 2. Only after checking, we overwrite with the new/current config + * 3. On next restart, this saved metadata will be used for compatibility check + */ + protected void persistLocalSnapshotMetadata() { + try { + LocalValidDocIdsSnapshotMetadata metadata = + LocalValidDocIdsSnapshotMetadata.fromUpsertContext(_partitionId, _context); + metadata.persist(_tableIndexDir); + _logger.info("Persisted local validDocIds snapshot metadata for partition: {}", _partitionId); + } catch (Exception e) { + _logger.warn("Failed to persist local validDocIds snapshot metadata for partition: {}", _partitionId, e); + } + } + + /** + * Checks if the local validDocIds snapshots are compatible with the current table configuration. + * This is used during preload to determine if the snapshots can be safely used. + * + * @return true if compatible or no metadata exists (for backward compatibility), false otherwise + */ + protected boolean isLocalSnapshotMetadataCompatible() { + LocalValidDocIdsSnapshotMetadata metadata = + LocalValidDocIdsSnapshotMetadata.fromDirectory(_tableIndexDir, _partitionId); + if (metadata == null) { + // In the initial restart, the preload might take longer as the metadata file wouldn't exist to compare + // To take the worst case approach, the snapshot was made in compatible + _logger.info("No local validDocIds snapshot metadata found for partition: {}, allowing preload for backward " + + "compatibility", _partitionId); + return false; Review Comment: Is this correct? Shouldn't it return `true`? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/LocalValidDocIdsSnapshotMetadata.java: ########## @@ -0,0 +1,401 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Metadata class to store table configuration information alongside local validDocIds snapshots. + * This is used to check if the local validDocIds snapshots are compatible with the current table + * configuration before using them during server restart/preload. + * + * The metadata file is stored at the table partition level, so that all segments in the same partition + * share the same metadata. The file is named "upsert.snapshot.metadata" and is stored in the table + * index directory. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class LocalValidDocIdsSnapshotMetadata { + private static final Logger LOGGER = LoggerFactory.getLogger(LocalValidDocIdsSnapshotMetadata.class); + + public static final String METADATA_FILE_NAME = "upsert.snapshot.metadata.partition."; + public static final int CURRENT_VERSION = 1; + private int _version = CURRENT_VERSION; + private int _partitionId; + private long _creationTime; + private List<String> _primaryKeyColumns; + private List<String> _comparisonColumns; + private String _deleteRecordColumn; + private HashFunction _hashFunction; + private double _metadataTTL; + private double _deletedKeysTTL; + private UpsertConfig.Mode _upsertMode; + private Map<String, UpsertConfig.Strategy> _partialUpsertStrategies; + private UpsertConfig.Strategy _defaultPartialUpsertStrategy; + private int _numPartitions; + + public LocalValidDocIdsSnapshotMetadata() { + } + + /** + * Creates metadata from the given UpsertContext and TableConfig. + */ + public static LocalValidDocIdsSnapshotMetadata fromUpsertContext(int partitionId, UpsertContext context) { + LocalValidDocIdsSnapshotMetadata metadata = new LocalValidDocIdsSnapshotMetadata(); + metadata.setVersion(CURRENT_VERSION); + metadata.setPartitionId(partitionId); + metadata.setCreationTime(System.currentTimeMillis()); + metadata.setPrimaryKeyColumns(context.getPrimaryKeyColumns()); + metadata.setComparisonColumns(context.getComparisonColumns()); + metadata.setDeleteRecordColumn(context.getDeleteRecordColumn()); + metadata.setHashFunction(context.getHashFunction()); + metadata.setMetadataTTL(context.getMetadataTTL()); + metadata.setDeletedKeysTTL(context.getDeletedKeysTTL()); + + TableConfig tableConfig = context.getTableConfig(); + if (tableConfig != null) { + UpsertConfig.Mode upsertMode = tableConfig.getUpsertMode(); + metadata.setUpsertMode(upsertMode); + if (upsertMode == UpsertConfig.Mode.PARTIAL) { + UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); + if (upsertConfig != null) { + metadata.setPartialUpsertStrategies(upsertConfig.getPartialUpsertStrategies()); + metadata.setDefaultPartialUpsertStrategy(upsertConfig.getDefaultPartialUpsertStrategy()); + } + } + // Get number of partitions from segment partition config + metadata.setNumPartitions(getNumPartitionsFromTableConfig(tableConfig)); + } + + return metadata; + } + + /** + * Gets the number of partitions from the table's segment partition config. + * + * @param tableConfig the table configuration + * @return the number of partitions, or 0 if not configured + */ + private static int getNumPartitionsFromTableConfig(TableConfig tableConfig) { + if (tableConfig.getIndexingConfig() == null) { + return 0; + } + SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig(); + if (segmentPartitionConfig == null || segmentPartitionConfig.getColumnPartitionMap() == null + || segmentPartitionConfig.getColumnPartitionMap().isEmpty()) { + return 0; + } + // For upsert tables, typically there's only one partition column + for (ColumnPartitionConfig columnPartitionConfig : segmentPartitionConfig.getColumnPartitionMap().values()) { + return columnPartitionConfig.getNumPartitions(); + } + return 0; + } + + /** + * Reads the metadata from the given directory for the specified partition. + * + * @param tableIndexDir the table index directory + * @param partitionId the partition ID + * @return the metadata, or null if the file does not exist or cannot be read + */ + @Nullable + public static LocalValidDocIdsSnapshotMetadata fromDirectory(File tableIndexDir, int partitionId) { + File metadataFile = new File(tableIndexDir, METADATA_FILE_NAME + partitionId); + if (!metadataFile.exists()) { + LOGGER.debug("Metadata file {} does not exist", metadataFile.getAbsolutePath()); + return null; + } + try { + return JsonUtils.fileToObject(metadataFile, LocalValidDocIdsSnapshotMetadata.class); + } catch (Exception e) { + LOGGER.warn("Failed to read metadata file {}: {}", metadataFile.getAbsolutePath(), e.getMessage()); + return null; + } + } + + /** + * Persists the metadata to the given directory. + * + * @param tableIndexDir the table index directory + * @throws IOException if the file cannot be written + */ + public void persist(File tableIndexDir) + throws IOException { + File metadataFile = new File(tableIndexDir, METADATA_FILE_NAME + _partitionId); + FileUtils.write(metadataFile, JsonUtils.objectToString(this), StandardCharsets.UTF_8); + LOGGER.debug("Persisted upsert snapshot metadata to {}", metadataFile.getAbsolutePath()); + } + + /** + * Checks if this metadata is compatible with the given UpsertContext. + * Returns true if the snapshots can be safely used for preloading. + * + * @param context the current UpsertContext + * @param tableName the table name (for logging) + * @return true if compatible, false otherwise + */ + @JsonIgnore + public boolean isCompatible(UpsertContext context, String tableName) { + if (!Objects.equals(_primaryKeyColumns, context.getPrimaryKeyColumns())) { + LOGGER.info("Previous snapshot used primary keys: {} different from current: {} for table: {}, partition: {}", + _primaryKeyColumns, context.getPrimaryKeyColumns(), tableName, _partitionId); + return false; + } + if (!Objects.equals(_comparisonColumns, context.getComparisonColumns())) { + LOGGER.info( + "Previous snapshot used comparison columns: {} different from current: {} for table: {}, " + "partition: {}", + _comparisonColumns, context.getComparisonColumns(), tableName, _partitionId); + return false; + } + + if (!StringUtils.equals(_deleteRecordColumn, context.getDeleteRecordColumn())) { + LOGGER.info( + "Previous snapshot used deleteRecordColumn: {} different from current: {} for table: {}, " + "partition: {}", + _deleteRecordColumn, context.getDeleteRecordColumn(), tableName, _partitionId); + return false; + } + if (_hashFunction != context.getHashFunction()) { + LOGGER.info("Previous snapshot used hash function: {} different from current: {} for table: {}, partition: {}", + _hashFunction, context.getHashFunction(), tableName, _partitionId); + return false; + } + if (Double.compare(_metadataTTL, context.getMetadataTTL()) != 0) { + LOGGER.info("Previous snapshot used metadataTTL: {} different from current: {} for table: {}, partition: {}", + _metadataTTL, context.getMetadataTTL(), tableName, _partitionId); + return false; + } + if (Double.compare(_deletedKeysTTL, context.getDeletedKeysTTL()) != 0) { + LOGGER.info("Previous snapshot used deletedKeysTTL: {} different from current: {} for table: {}, partition: {}", + _deletedKeysTTL, context.getDeletedKeysTTL(), tableName, _partitionId); + return false; + } + TableConfig tableConfig = context.getTableConfig(); + if (tableConfig != null) { + UpsertConfig.Mode currentUpsertMode = tableConfig.getUpsertMode(); + if (_upsertMode != null && _upsertMode != currentUpsertMode) { + LOGGER.info("Previous snapshot used upsert mode: {} different from current: {} for table: {}, partition: {}", + _upsertMode, currentUpsertMode, tableName, _partitionId); + return false; + } + + if (currentUpsertMode == UpsertConfig.Mode.PARTIAL) { Review Comment: Partial upsert shouldn't be related here ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/LocalValidDocIdsSnapshotMetadata.java: ########## @@ -0,0 +1,401 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Metadata class to store table configuration information alongside local validDocIds snapshots. + * This is used to check if the local validDocIds snapshots are compatible with the current table + * configuration before using them during server restart/preload. + * + * The metadata file is stored at the table partition level, so that all segments in the same partition + * share the same metadata. The file is named "upsert.snapshot.metadata" and is stored in the table + * index directory. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class LocalValidDocIdsSnapshotMetadata { + private static final Logger LOGGER = LoggerFactory.getLogger(LocalValidDocIdsSnapshotMetadata.class); + + public static final String METADATA_FILE_NAME = "upsert.snapshot.metadata.partition."; + public static final int CURRENT_VERSION = 1; + private int _version = CURRENT_VERSION; + private int _partitionId; + private long _creationTime; + private List<String> _primaryKeyColumns; + private List<String> _comparisonColumns; + private String _deleteRecordColumn; + private HashFunction _hashFunction; + private double _metadataTTL; + private double _deletedKeysTTL; + private UpsertConfig.Mode _upsertMode; + private Map<String, UpsertConfig.Strategy> _partialUpsertStrategies; + private UpsertConfig.Strategy _defaultPartialUpsertStrategy; + private int _numPartitions; + + public LocalValidDocIdsSnapshotMetadata() { + } + + /** + * Creates metadata from the given UpsertContext and TableConfig. + */ + public static LocalValidDocIdsSnapshotMetadata fromUpsertContext(int partitionId, UpsertContext context) { + LocalValidDocIdsSnapshotMetadata metadata = new LocalValidDocIdsSnapshotMetadata(); + metadata.setVersion(CURRENT_VERSION); + metadata.setPartitionId(partitionId); + metadata.setCreationTime(System.currentTimeMillis()); + metadata.setPrimaryKeyColumns(context.getPrimaryKeyColumns()); + metadata.setComparisonColumns(context.getComparisonColumns()); + metadata.setDeleteRecordColumn(context.getDeleteRecordColumn()); + metadata.setHashFunction(context.getHashFunction()); + metadata.setMetadataTTL(context.getMetadataTTL()); + metadata.setDeletedKeysTTL(context.getDeletedKeysTTL()); + + TableConfig tableConfig = context.getTableConfig(); + if (tableConfig != null) { + UpsertConfig.Mode upsertMode = tableConfig.getUpsertMode(); + metadata.setUpsertMode(upsertMode); + if (upsertMode == UpsertConfig.Mode.PARTIAL) { + UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); + if (upsertConfig != null) { + metadata.setPartialUpsertStrategies(upsertConfig.getPartialUpsertStrategies()); + metadata.setDefaultPartialUpsertStrategy(upsertConfig.getDefaultPartialUpsertStrategy()); + } + } + // Get number of partitions from segment partition config + metadata.setNumPartitions(getNumPartitionsFromTableConfig(tableConfig)); + } + + return metadata; + } + + /** + * Gets the number of partitions from the table's segment partition config. + * + * @param tableConfig the table configuration + * @return the number of partitions, or 0 if not configured + */ + private static int getNumPartitionsFromTableConfig(TableConfig tableConfig) { + if (tableConfig.getIndexingConfig() == null) { + return 0; + } + SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig(); + if (segmentPartitionConfig == null || segmentPartitionConfig.getColumnPartitionMap() == null + || segmentPartitionConfig.getColumnPartitionMap().isEmpty()) { + return 0; + } + // For upsert tables, typically there's only one partition column + for (ColumnPartitionConfig columnPartitionConfig : segmentPartitionConfig.getColumnPartitionMap().values()) { + return columnPartitionConfig.getNumPartitions(); + } + return 0; + } + + /** + * Reads the metadata from the given directory for the specified partition. + * + * @param tableIndexDir the table index directory + * @param partitionId the partition ID + * @return the metadata, or null if the file does not exist or cannot be read + */ + @Nullable + public static LocalValidDocIdsSnapshotMetadata fromDirectory(File tableIndexDir, int partitionId) { + File metadataFile = new File(tableIndexDir, METADATA_FILE_NAME + partitionId); + if (!metadataFile.exists()) { + LOGGER.debug("Metadata file {} does not exist", metadataFile.getAbsolutePath()); + return null; + } + try { + return JsonUtils.fileToObject(metadataFile, LocalValidDocIdsSnapshotMetadata.class); + } catch (Exception e) { + LOGGER.warn("Failed to read metadata file {}: {}", metadataFile.getAbsolutePath(), e.getMessage()); + return null; + } + } + + /** + * Persists the metadata to the given directory. + * + * @param tableIndexDir the table index directory + * @throws IOException if the file cannot be written + */ + public void persist(File tableIndexDir) + throws IOException { + File metadataFile = new File(tableIndexDir, METADATA_FILE_NAME + _partitionId); + FileUtils.write(metadataFile, JsonUtils.objectToString(this), StandardCharsets.UTF_8); + LOGGER.debug("Persisted upsert snapshot metadata to {}", metadataFile.getAbsolutePath()); + } + + /** + * Checks if this metadata is compatible with the given UpsertContext. + * Returns true if the snapshots can be safely used for preloading. + * + * @param context the current UpsertContext + * @param tableName the table name (for logging) + * @return true if compatible, false otherwise + */ + @JsonIgnore + public boolean isCompatible(UpsertContext context, String tableName) { Review Comment: I feel this is too strict. Do we need to check so many variables? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java: ########## @@ -254,13 +258,32 @@ public void preloadSegments(IndexLoadingConfig indexLoadingConfig) { } finally { _isPreloading = false; _preloadLock.unlock(); + // Persist snapshot metadata AFTER preload completes (whether successful or not). + // This ensures we first check compatibility with OLD metadata before overwriting with new config. + // The metadata will be used on the next restart to check if validDocId snapshots are still compatible. + if (_enableSnapshot) { + deleteStaleSnapshotMetadataFile(); + persistLocalSnapshotMetadata(); Review Comment: Do we need to re-generate the metadata when it is already compatible? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/LocalValidDocIdsSnapshotMetadata.java: ########## @@ -0,0 +1,401 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Metadata class to store table configuration information alongside local validDocIds snapshots. + * This is used to check if the local validDocIds snapshots are compatible with the current table + * configuration before using them during server restart/preload. + * + * The metadata file is stored at the table partition level, so that all segments in the same partition + * share the same metadata. The file is named "upsert.snapshot.metadata" and is stored in the table + * index directory. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class LocalValidDocIdsSnapshotMetadata { + private static final Logger LOGGER = LoggerFactory.getLogger(LocalValidDocIdsSnapshotMetadata.class); + + public static final String METADATA_FILE_NAME = "upsert.snapshot.metadata.partition."; + public static final int CURRENT_VERSION = 1; + private int _version = CURRENT_VERSION; + private int _partitionId; + private long _creationTime; + private List<String> _primaryKeyColumns; + private List<String> _comparisonColumns; + private String _deleteRecordColumn; + private HashFunction _hashFunction; + private double _metadataTTL; + private double _deletedKeysTTL; + private UpsertConfig.Mode _upsertMode; + private Map<String, UpsertConfig.Strategy> _partialUpsertStrategies; + private UpsertConfig.Strategy _defaultPartialUpsertStrategy; + private int _numPartitions; + + public LocalValidDocIdsSnapshotMetadata() { + } + + /** + * Creates metadata from the given UpsertContext and TableConfig. + */ + public static LocalValidDocIdsSnapshotMetadata fromUpsertContext(int partitionId, UpsertContext context) { + LocalValidDocIdsSnapshotMetadata metadata = new LocalValidDocIdsSnapshotMetadata(); + metadata.setVersion(CURRENT_VERSION); + metadata.setPartitionId(partitionId); + metadata.setCreationTime(System.currentTimeMillis()); + metadata.setPrimaryKeyColumns(context.getPrimaryKeyColumns()); + metadata.setComparisonColumns(context.getComparisonColumns()); + metadata.setDeleteRecordColumn(context.getDeleteRecordColumn()); + metadata.setHashFunction(context.getHashFunction()); + metadata.setMetadataTTL(context.getMetadataTTL()); + metadata.setDeletedKeysTTL(context.getDeletedKeysTTL()); + + TableConfig tableConfig = context.getTableConfig(); + if (tableConfig != null) { + UpsertConfig.Mode upsertMode = tableConfig.getUpsertMode(); + metadata.setUpsertMode(upsertMode); + if (upsertMode == UpsertConfig.Mode.PARTIAL) { + UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); + if (upsertConfig != null) { + metadata.setPartialUpsertStrategies(upsertConfig.getPartialUpsertStrategies()); + metadata.setDefaultPartialUpsertStrategy(upsertConfig.getDefaultPartialUpsertStrategy()); + } + } + // Get number of partitions from segment partition config + metadata.setNumPartitions(getNumPartitionsFromTableConfig(tableConfig)); + } + + return metadata; + } + + /** + * Gets the number of partitions from the table's segment partition config. + * + * @param tableConfig the table configuration + * @return the number of partitions, or 0 if not configured + */ + private static int getNumPartitionsFromTableConfig(TableConfig tableConfig) { + if (tableConfig.getIndexingConfig() == null) { + return 0; + } + SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig(); + if (segmentPartitionConfig == null || segmentPartitionConfig.getColumnPartitionMap() == null + || segmentPartitionConfig.getColumnPartitionMap().isEmpty()) { + return 0; + } + // For upsert tables, typically there's only one partition column + for (ColumnPartitionConfig columnPartitionConfig : segmentPartitionConfig.getColumnPartitionMap().values()) { + return columnPartitionConfig.getNumPartitions(); + } + return 0; + } + + /** + * Reads the metadata from the given directory for the specified partition. + * + * @param tableIndexDir the table index directory + * @param partitionId the partition ID + * @return the metadata, or null if the file does not exist or cannot be read + */ + @Nullable + public static LocalValidDocIdsSnapshotMetadata fromDirectory(File tableIndexDir, int partitionId) { + File metadataFile = new File(tableIndexDir, METADATA_FILE_NAME + partitionId); + if (!metadataFile.exists()) { + LOGGER.debug("Metadata file {} does not exist", metadataFile.getAbsolutePath()); + return null; + } + try { + return JsonUtils.fileToObject(metadataFile, LocalValidDocIdsSnapshotMetadata.class); + } catch (Exception e) { + LOGGER.warn("Failed to read metadata file {}: {}", metadataFile.getAbsolutePath(), e.getMessage()); + return null; + } + } + + /** + * Persists the metadata to the given directory. + * + * @param tableIndexDir the table index directory + * @throws IOException if the file cannot be written + */ + public void persist(File tableIndexDir) + throws IOException { + File metadataFile = new File(tableIndexDir, METADATA_FILE_NAME + _partitionId); + FileUtils.write(metadataFile, JsonUtils.objectToString(this), StandardCharsets.UTF_8); + LOGGER.debug("Persisted upsert snapshot metadata to {}", metadataFile.getAbsolutePath()); + } + + /** + * Checks if this metadata is compatible with the given UpsertContext. + * Returns true if the snapshots can be safely used for preloading. + * + * @param context the current UpsertContext + * @param tableName the table name (for logging) + * @return true if compatible, false otherwise + */ + @JsonIgnore + public boolean isCompatible(UpsertContext context, String tableName) { + if (!Objects.equals(_primaryKeyColumns, context.getPrimaryKeyColumns())) { + LOGGER.info("Previous snapshot used primary keys: {} different from current: {} for table: {}, partition: {}", + _primaryKeyColumns, context.getPrimaryKeyColumns(), tableName, _partitionId); + return false; + } + if (!Objects.equals(_comparisonColumns, context.getComparisonColumns())) { + LOGGER.info( + "Previous snapshot used comparison columns: {} different from current: {} for table: {}, " + "partition: {}", + _comparisonColumns, context.getComparisonColumns(), tableName, _partitionId); + return false; + } + + if (!StringUtils.equals(_deleteRecordColumn, context.getDeleteRecordColumn())) { Review Comment: (minor) Use `Objects.equals()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
