prashantwason commented on a change in pull request #2064: URL: https://github.com/apache/hudi/pull/2064#discussion_r497097206
########## File path: hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataImpl.java ########## @@ -0,0 +1,1104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.ClientUtils; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; +import org.apache.hudi.common.util.CleanerUtils; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.SpillableMapUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieMetricsConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.metrics.HoodieMetrics; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import com.codahale.metrics.Timer; + +import scala.Tuple2; + +/** + * Metadata implementation which saves partition and file listing within an internal MOR table + * called Metadata Table. This table is created by listing files and partitions (first time) and kept in sync + * using the instants on the main dataset. + */ +public class HoodieMetadataImpl { + private static final Logger LOG = LogManager.getLogger(HoodieMetadataImpl.class); + + // Table name suffix + private static final String METADATA_TABLE_NAME_SUFFIX = "_metadata"; + // Timestamp for a commit when the base dataset had not had any commits yet. + private static final String SOLO_COMMIT_TIMESTAMP = "00000000000000"; + + // Name of partition which saves file listings + public static final String METADATA_PARTITION_NAME = "metadata_partition"; + // List of all partitions + public static final String[] METADATA_ALL_PARTITIONS = {METADATA_PARTITION_NAME}; + // Key for the record which saves list of all partitions + static final String RECORDKEY_PARTITION_LIST = "__all_partitions__"; + + // Metric names + private static final String INITIALIZE_STR = "initialize"; + private static final String SYNC_STR = "sync"; + private static final String LOOKUP_PARTITIONS_STR = "lookup_partitions"; + private static final String LOOKUP_FILES_STR = "lookup_files"; + private static final String VALIDATE_PARTITIONS_STR = "validate_partitions"; + private static final String VALIDATE_FILES_STR = "validate_files"; + private static final String VALIDATE_ERRORS_STR = "validate_errors"; + private static final String SCAN_STR = "scan"; + + // Stats names + private static final String STAT_TOTAL_BASE_FILE_SIZE = "totalBaseFileSizeInBytes"; + private static final String STAT_TOTAL_LOG_FILE_SIZE = "totalLogFileSizeInBytes"; + private static final String STAT_COUNT_BASE_FILES = "baseFileCount"; + private static final String STAT_COUNT_LOG_FILES = "logFileCount"; + private static final String STAT_COUNT_PARTITION = "partitionCount"; + private static final String STAT_IN_SYNC = "isInSync"; + private static final String STAT_LAST_COMPACTION_TIMESTAMP = "lastCompactionTimestamp"; + + private final JavaSparkContext jsc; + private final Configuration hadoopConf; + private final String datasetBasePath; + private final String metadataBasePath; + private final HoodieWriteConfig config; + private final String tableName; + private final Schema schema; + private long maxMemorySizeInBytes = 1024 * 1024 * 1024; // TODO + private int bufferSize = 10 * 1024 * 1024; // TODO + private HoodieTableMetaClient metaClient; + private HoodieMetrics metrics; + private boolean validateLookups = false; + private int numLookupsAllPartitions = 0; + private int numLookupsFilesInPartition = 0; + private int numScans = 0; + private int numValidationErrors = 0; + // In read only mode, no changes are made to the metadata table + private boolean readOnly = true; + + // Readers for the base and log file which store the metadata + private HoodieFileReader<GenericRecord> basefileReader; + private HoodieMetadataMergedLogRecordScanner logRecordScanner; + + HoodieMetadataImpl(JavaSparkContext jsc, Configuration hadoopConf, String datasetBasePath, String metadataBasePath, + HoodieWriteConfig writeConfig, boolean readOnly) throws IOException { + this.jsc = jsc; + this.hadoopConf = hadoopConf; + this.datasetBasePath = datasetBasePath; + this.metadataBasePath = metadataBasePath; + this.readOnly = readOnly; + + this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX; + this.validateLookups = writeConfig.getFileListingMetadataVerify(); + + // Load the schema + String schemaStr; + try { + schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/metadataSchema.txt")); + this.schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); + } catch (IOException e) { + throw new HoodieMetadataException("Failed to read metadata schema", e); + } + + if (readOnly) { + this.config = null; + metaClient = new HoodieTableMetaClient(hadoopConf, metadataBasePath); + } else { + this.config = createMetadataWriteConfig(writeConfig, schemaStr); + this.metrics = new HoodieMetrics(this.config, this.config.getTableName()); + + // Inline compaction and auto clean is required as we dont expose this table outside + ValidationUtils.checkArgument(this.config.isAutoClean(), "Auto clean is required for Metadata Compaction config"); + ValidationUtils.checkArgument(this.config.isInlineCompaction(), "Inline compaction is required for Metadata Compaction config"); + // Metadata Table cannot have its metadata optimized + ValidationUtils.checkArgument(this.config.shouldAutoCommit(), "Auto commit is required for Metadata Table"); + ValidationUtils.checkArgument(!this.config.useFileListingMetadata(), "File listing cannot be used for Metadata Table"); + + initAndSyncMetadataTable(); + } + } + + /** + * Create a {@code HoodieWriteConfig} to use for the Metadata Table. + * + * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer + * @param schemaStr Metadata Table schema + */ + private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfig, String schemaStr) throws IOException { + // Create the write config for the metadata table by borrowing options from the main write config. + HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() + .withTimelineLayoutVersion(writeConfig.getTimelineLayoutVersion()) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder() + .withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled()) + .withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs()) + .withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs()) + .withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks()) + .build()) + .withUseFileListingMetadata(false) + .withFileListingMetadataVerify(false) + .withAutoCommit(true) + .withAvroSchemaValidate(true) + .withEmbeddedTimelineServerEnabled(false) + .withAssumeDatePartitioning(false) + .withPath(metadataBasePath) + .withSchema(schemaStr) + .forTable(tableName) + .withParallelism(1, 1).withDeleteParallelism(1).withRollbackParallelism(1).withFinalizeWriteParallelism(1) + .withCompactionConfig(writeConfig.getMetadataCompactionConfig()); + + if (writeConfig.isMetricsOn()) { + HoodieMetricsConfig.Builder metricsConfig = HoodieMetricsConfig.newBuilder() + .withReporterType(writeConfig.getMetricsReporterType().toString()) + .on(true); + switch (writeConfig.getMetricsReporterType()) { + case GRAPHITE: + metricsConfig.onGraphitePort(writeConfig.getGraphiteServerPort()) + .toGraphiteHost(writeConfig.getGraphiteServerHost()) + .usePrefix(writeConfig.getGraphiteMetricPrefix()); + break; + case JMX: + metricsConfig.onJmxPort(writeConfig.getJmxPort()) + .toJmxHost(writeConfig.getJmxHost()); + break; + case DATADOG: + // TODO: + break; + case CONSOLE: + case INMEMORY: + break; + default: + throw new HoodieMetadataException("Unsupported Metrics Reporter type " + writeConfig.getMetricsReporterType()); + } + + builder.withMetricsConfig(metricsConfig.build()); + } + + return builder.build(); + } + + public HoodieWriteConfig getWriteConfig() { + return config; + } + + /** + * Reload the metadata table by syncing it based on the commits on the dataset. + * + * This is only allowed + */ + + public void reload() throws IOException { + initAndSyncMetadataTable(); + } + + /** + * Initialize the metadata table if it does not exist. Update the metadata to bring it in sync with the file system. + * + * This can happen in two ways: + * 1. If the metadata table did not exist, then file and partition listing is used + * 2. If the metadata table exists, the instants from active timeline are read in order and changes applied + * + * The above logic has been chosen because it is faster to perform #1 at scale rather than read all the Instants + * which are large in size (AVRO or JSON encoded and not compressed) and incur considerable IO for de-serialization + * and decoding. + */ + private void initAndSyncMetadataTable() throws IOException { + ValidationUtils.checkState(!readOnly, "Metadata table cannot be initialized in readonly mode"); + + final Timer.Context context = this.metrics.getMetadataCtx(INITIALIZE_STR); + + HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf, datasetBasePath); + FileSystem fs = FSUtils.getFs(metadataBasePath, hadoopConf); + boolean exists = fs.exists(new Path(metadataBasePath, HoodieTableMetaClient.METAFOLDER_NAME)); + + if (!exists) { + // Initialize for the first time by listing partitions and files directly from the file system + initFromFilesystem(datasetMetaClient); + } else { + metaClient = ClientUtils.createMetaClient(hadoopConf, config, true); + } + + /* + // TODO: We may not be able to sync in certain cases (instants archived etc) + //if (!canSync(datasetMetaClient)) { + // Need to recreate the table as sync has failed + // TODO: delete the table + // initFromFilesystem(datasetMetaClient); + //} + */ + + // This is always called even in case the table was created for the first time. This is because + // initFromFilesystem() does file listing and hence may take a long time during which some new updates + // may have occurred on the table. Hence, calling this always ensures that the metadata is brought in sync + // with the active timeline. + syncFromInstants(datasetMetaClient); + + // Publish some metrics + if (context != null) { + long durationInMs = metrics.getDurationInMs(context.stop()); + // Time to initilize and sync + if (exists) { + metrics.updateMetadataMetrics(INITIALIZE_STR, 0, 0); + metrics.updateMetadataMetrics(SYNC_STR, durationInMs, 1); + } else { + metrics.updateMetadataMetrics(INITIALIZE_STR, durationInMs, 1); + metrics.updateMetadataMetrics(SYNC_STR, 0, 0); + } + + // Total size of the metadata and count of base/log files + Map<String, String> stats = getStats(false); + metrics.updateMetadataSizeMetrics(Long.valueOf(stats.get(STAT_TOTAL_BASE_FILE_SIZE)), + Long.valueOf(stats.get(STAT_TOTAL_LOG_FILE_SIZE)), Integer.valueOf(stats.get(STAT_COUNT_BASE_FILES)), + Integer.valueOf(stats.get(STAT_COUNT_LOG_FILES))); + } + } + + /** + * Initialize the Metadata Table by listing files and partitions from the file system. + * + * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset + */ + private void initFromFilesystem(HoodieTableMetaClient datasetMetaClient) throws IOException { + ValidationUtils.checkState(!readOnly, "Metadata table cannot be initialized in readonly mode"); + + // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit + Option<HoodieInstant> latestInstant = datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant(); + String createInstantTime = latestInstant.isPresent() ? latestInstant.get().getTimestamp() : SOLO_COMMIT_TIMESTAMP; + + LOG.info("Creating a new metadata table in " + metadataBasePath + " at instant " + createInstantTime); + metaClient = HoodieTableMetaClient.initTableType(hadoopConf, metadataBasePath.toString(), + HoodieTableType.MERGE_ON_READ, tableName, "archived", HoodieMetadataPayload.class.getName(), + HoodieFileFormat.HFILE.toString()); + + // List all partitions in the basePath of the containing dataset + FileSystem fs = FSUtils.getFs(datasetBasePath, hadoopConf); + List<String> partitions = FSUtils.getAllPartitionPaths(fs, datasetBasePath, false); + LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions"); + + // List all partitions in parallel and collect the files in them + final String dbasePath = datasetBasePath; + final SerializableConfiguration serializedConf = new SerializableConfiguration(hadoopConf); + int parallelism = Math.min(partitions.size(), 100) + 1; // +1 to ensure non zero + JavaPairRDD<String, FileStatus[]> partitionFileListRDD = jsc.parallelize(partitions, parallelism) + .mapToPair(partition -> { + FileSystem fsys = FSUtils.getFs(dbasePath, serializedConf.get()); + FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new Path(dbasePath, partition)); + return new Tuple2<>(partition, statuses); + }); + + // Collect the list of partitions and file lists + List<Tuple2<String, FileStatus[]>> partitionFileList = partitionFileListRDD.collect(); + + // Create a HoodieCommitMetadata with writeStats for all discovered files + int[] stats = {0}; + HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + partitionFileList.forEach(t -> { + final String partition = t._1; + try { + if (!fs.exists(new Path(datasetBasePath, partition + Path.SEPARATOR + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) { + return; + } + } catch (IOException e) { + throw new HoodieMetadataException("Failed to check partition " + partition, e); + } + + // If the partition has no files then create a writeStat with no file path + if (t._2.length == 0) { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPartitionPath(partition); + metadata.addWriteStat(partition, writeStat); + } else { + Arrays.stream(t._2).forEach(status -> { + String filename = status.getPath().getName(); + if (filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { + return; + } + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPath(partition + Path.SEPARATOR + filename); + writeStat.setPartitionPath(partition); + writeStat.setTotalWriteBytes(status.getLen()); + metadata.addWriteStat(partition, writeStat); + }); + } + stats[0] += t._2.length; + }); + + LOG.info("Committing " + partitionFileList.size() + " partitions and " + stats[0] + " files to metadata"); + update(metadata, createInstantTime); + } + + /** + * Sync the Metadata Table from the instants created on the dataset. + * + * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset + */ + private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) throws IOException { + ValidationUtils.checkState(!readOnly, "Metadata table cannot be synced in readonly mode"); + + List<HoodieInstant> instantsToSync = findInstantsToSync(datasetMetaClient); + if (instantsToSync.isEmpty()) { + return; + } + + LOG.info("Syncing " + instantsToSync.size() + " instants to metadata table: " + instantsToSync); + + // Read each instant in order and sync it to metadata table + final HoodieActiveTimeline timeline = datasetMetaClient.getActiveTimeline(); + for (HoodieInstant instant : instantsToSync) { + LOG.info("Syncing instant " + instant + " to metadata table"); + + switch (instant.getAction()) { + case HoodieTimeline.CLEAN_ACTION: { + // CLEAN is synced from the + // - inflight instant which contains the HoodieCleanerPlan, or + // - complete instant which contains the HoodieCleanMetadata + try { + HoodieInstant inflightCleanInstant = new HoodieInstant(true, instant.getAction(), instant.getTimestamp()); + ValidationUtils.checkArgument(inflightCleanInstant.isInflight()); + HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(datasetMetaClient, inflightCleanInstant); + update(cleanerPlan, instant.getTimestamp()); + } catch (HoodieIOException e) { + HoodieInstant cleanInstant = new HoodieInstant(false, instant.getAction(), instant.getTimestamp()); + ValidationUtils.checkArgument(cleanInstant.isCompleted()); + HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, cleanInstant); + update(cleanMetadata, instant.getTimestamp()); + } + break; + } + case HoodieTimeline.DELTA_COMMIT_ACTION: + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.COMPACTION_ACTION: { + ValidationUtils.checkArgument(instant.isCompleted()); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + update(commitMetadata, instant.getTimestamp()); + break; + } + case HoodieTimeline.ROLLBACK_ACTION: { + ValidationUtils.checkArgument(instant.isCompleted()); + HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata( + timeline.getInstantDetails(instant).get()); + update(rollbackMetadata, instant.getTimestamp()); + break; + } + case HoodieTimeline.RESTORE_ACTION: { + ValidationUtils.checkArgument(instant.isCompleted()); + HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata( + timeline.getInstantDetails(instant).get()); + update(restoreMetadata, instant.getTimestamp()); + break; + } + case HoodieTimeline.SAVEPOINT_ACTION: { + ValidationUtils.checkArgument(instant.isCompleted()); + // Nothing to be done here + break; + } + default: { + throw new HoodieException("Unknown type of action " + instant.getAction()); + } + } + } + } + + /** + * Return an ordered list of instants which have not been synced to the Metadata Table. + + * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset + */ + public List<HoodieInstant> findInstantsToSync(HoodieTableMetaClient datasetMetaClient) { + HoodieTimeline metaTimeline = metaClient.reloadActiveTimeline(); + + // All instants since the last time metadata table was compacted are candidates for sync + Option<String> compactionTimestamp = getLatestCompactionTimestamp(); + + // If there has not been any compaction then the first delta commit instant should be the one at which + // the metadata table was created. We should not sync any instants before that creation time. + Option<HoodieInstant> oldestMetaInstant = Option.empty(); + if (!compactionTimestamp.isPresent()) { + oldestMetaInstant = Option.fromJavaOptional(metaTimeline.getInstants() + .filter(i -> i.isCompleted() && i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) + .findFirst()); + if (oldestMetaInstant.isPresent()) { + // TODO: Ensure this is the instant at which we created the metadata table + } + } + + String metaSyncTimestamp = compactionTimestamp.isPresent() ? compactionTimestamp.get() + : oldestMetaInstant.isPresent() ? oldestMetaInstant.get().getTimestamp() : ""; + + // Metadata table is updated when an instant is completed except for the following: + // CLEAN: metadata table is updated during inflight. So for CLEAN we accept inflight actions. + List<HoodieInstant> datasetInstants = datasetMetaClient.getActiveTimeline().getInstants() + .filter(i -> i.isCompleted() || (i.getAction().equals(HoodieTimeline.CLEAN_ACTION) && i.isInflight())) + .filter(i -> metaSyncTimestamp.isEmpty() + || HoodieTimeline.compareTimestamps(i.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, + metaSyncTimestamp)) + .collect(Collectors.toList()); + + // Each operation on dataset leads to a delta-commit on the metadata MOR table. So find only delta-commit + // instants in metadata table which are after the last compaction. + Map<String, HoodieInstant> metadataInstantMap = metaTimeline.getInstants() + .filter(i -> i.isCompleted() && i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION) + && (metaSyncTimestamp.isEmpty() || HoodieTimeline.compareTimestamps(i.getTimestamp(), + HoodieTimeline.GREATER_THAN_OR_EQUALS, + metaSyncTimestamp))) + .collect(Collectors.toMap(HoodieInstant::getTimestamp, Function.identity())); + + List<HoodieInstant> instantsToSync = new LinkedList<>(); + datasetInstants.forEach(instant -> { + if (metadataInstantMap.containsKey(instant.getTimestamp())) { + // instant already synced to metadata table + if (!instantsToSync.isEmpty()) { + LOG.warn("Found out-of-order already synced instant " + instant + ". Instants to sync=" + instantsToSync); + } + } else { + instantsToSync.add(instant); + } + }); + + // TODO: async clean and async compaction are not handled here. They have a timestamp which is in the past + // (when the operation was scheduled) and even on completion they retain their old timestamp. + return instantsToSync; + } + + /** + * Update from {@code HoodieCommitMetadata}. + * + * @param commitMetadata {@code HoodieCommitMetadata} + * @param instantTime Timestamp at which the commit was performed + */ + void update(HoodieCommitMetadata commitMetadata, String instantTime) { + List<HoodieRecord> records = new LinkedList<>(); + commitMetadata.getPartitionToWriteStats().forEach((partition, writeStats) -> { + Map<String, Long> newFiles = new HashMap<>(writeStats.size()); + writeStats.forEach(hoodieWriteStat -> { + String pathWithPartition = hoodieWriteStat.getPath(); + if (pathWithPartition == null) { + // Empty partition + return; + } + String filename = pathWithPartition.substring(partition.length() + 1); + ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate files in HoodieCommitMetadata"); + newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes()); + }); + + // New files added to a partition + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(newFiles), + Option.empty()); + records.add(record); + }); + + // New partitions created + HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord( + new ArrayList<String>(commitMetadata.getPartitionToWriteStats().keySet())); + records.add(record); + + LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType() + + ". #partitions_updated=" + records.size()); + commit(prepRecords(records, METADATA_PARTITION_NAME), instantTime, true); + } + + /** + * Update from {@code HoodieCleanerPlan}. + * + * @param cleanerPlan {@code HoodieCleanerPlan} + * @param instantTime Timestamp at which the clean plan was generated + */ + void update(HoodieCleanerPlan cleanerPlan, String instantTime) { + // TODO: Cleaner update is called even before the operation has completed. Hence, this function may be called + // multiple times and we dont need to update metadata again. + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + long cnt = timeline.filterCompletedInstants().getInstants().filter(i -> i.getTimestamp().equals(instantTime)).count(); + if (cnt == 1) { + LOG.info("Ignoring update from cleaner plan for already completed instant " + instantTime); + return; + } + + List<HoodieRecord> records = new LinkedList<>(); + int[] fileDeleteCount = {0}; + cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, deletedPathInfo) -> { + fileDeleteCount[0] += deletedPathInfo.size(); + + // Files deleted from a partition + List<String> deletedFilenames = deletedPathInfo.stream().map(p -> new Path(p.getFilePath()).getName()) + .collect(Collectors.toList()); + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), + Option.of(deletedFilenames)); + records.add(record); + }); + + LOG.info("Updating at " + instantTime + " from CleanerPlan. #partitions_updated=" + records.size() + + ", #files_deleted=" + fileDeleteCount[0]); + commit(prepRecords(records, METADATA_PARTITION_NAME), instantTime, true); + } + + /** + * Update from {@code HoodieCleanMetadata}. + * + * @param cleanMetadata {@code HoodieCleanMetadata} + * @param instantTime Timestamp at which the clean was completed + */ + private void update(HoodieCleanMetadata cleanMetadata, String instantTime) { + List<HoodieRecord> records = new LinkedList<>(); + int[] fileDeleteCount = {0}; + + cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { + // Files deleted from a partition + List<String> deletedFiles = partitionMetadata.getSuccessDeleteFiles(); + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), + Option.of(new ArrayList<>(deletedFiles))); + + records.add(record); + fileDeleteCount[0] += deletedFiles.size(); + }); + + LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size() + + ", #files_deleted=" + fileDeleteCount[0]); + commit(prepRecords(records, METADATA_PARTITION_NAME), instantTime, true); + } + + /** + * Update from {@code HoodieRestoreMetadata}. + * + * @param restoreMetadata {@code HoodieRestoreMetadata} + * @param instantTime Timestamp at which the restore was performed + */ + void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { + Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>(); + Map<String, List<String>> partitionToDeletedFiles = new HashMap<>(); + restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> { + rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, partitionToAppendedFiles)); + }); + + commitRollback(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore"); + } + + /** + * Update from {@code HoodieRollbackMetadata}. + * + * @param rollbackMetadata {@code HoodieRollbackMetadata} + * @param instantTime Timestamp at which the rollback was performed + */ + void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) { + Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>(); + Map<String, List<String>> partitionToDeletedFiles = new HashMap<>(); + processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles); + commitRollback(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback"); + } + + /** + * Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}. + * + * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This + * function will extract this change file for each partition. + * + * @param rollbackMetadata {@code HoodieRollbackMetadata} + * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition. + * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes. + */ + private void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata, + Map<String, List<String>> partitionToDeletedFiles, + Map<String, Map<String, Long>> partitionToAppendedFiles) { + rollbackMetadata.getPartitionMetadata().values().forEach(pm -> { + final String partition = pm.getPartitionPath(); + + if (!pm.getSuccessDeleteFiles().isEmpty()) { + if (!partitionToDeletedFiles.containsKey(partition)) { + partitionToDeletedFiles.put(partition, new ArrayList<>()); + } + + // Extract deleted file name from the absolute paths saved in getSuccessDeleteFiles() + List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p -> new Path(p).getName()) + .collect(Collectors.toList()); + partitionToDeletedFiles.get(partition).addAll(deletedFiles); + } + + if (!pm.getAppendFiles().isEmpty()) { + if (!partitionToAppendedFiles.containsKey(partition)) { + partitionToAppendedFiles.put(partition, new HashMap<>()); + } + + // Extract appended file name from the absolute paths saved in getAppendFiles() + pm.getAppendFiles().forEach((path, size) -> { + partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> { + return size + oldSize; + }); + }); + } + }); + } + + /** + * Create file delete records and commit. + * + * @param partitionToDeletedFiles {@code Map} of partitions and the deleted files + * @param instantTime Timestamp at which the deletes took place + * @param operation Type of the operation which caused the files to be deleted + */ + private void commitRollback(Map<String, List<String>> partitionToDeletedFiles, + Map<String, Map<String, Long>> partitionToAppendedFiles, String instantTime, + String operation) { + List<HoodieRecord> records = new LinkedList<>(); + int[] fileChangeCount = {0, 0}; // deletes, appends + + partitionToDeletedFiles.forEach((partition, deletedFiles) -> { + // Rollbacks deletes instants from timeline. The instant being rolled-back may not have been synced to the + // metadata table. Hence, the deleted filed need to be checked against the metadata. + try { + FileStatus[] existingStatuses = getAllFilesInPartition(new Path(datasetBasePath, partition)); + Set<String> currentFiles = + Arrays.stream(existingStatuses).map(s -> s.getPath().getName()).collect(Collectors.toSet()); + + int origCount = deletedFiles.size(); + deletedFiles.removeIf(f -> !currentFiles.contains(f)); + if (deletedFiles.size() != origCount) { + LOG.warn("Some Files to be deleted as part of " + operation + " at " + instantTime + " were not found in the " + + " metadata for partition " + partition + + ". To delete = " + origCount + ", found=" + deletedFiles.size()); + } + + fileChangeCount[0] += deletedFiles.size(); + + Option<Map<String, Long>> filesAdded = Option.empty(); + if (partitionToAppendedFiles.containsKey(partition)) { + filesAdded = Option.of(partitionToAppendedFiles.remove(partition)); + } + + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded, + Option.of(new ArrayList<>(deletedFiles))); + records.add(record); + } catch (IOException e) { + throw new HoodieMetadataException("Failed to commit rollback deletes at instant " + instantTime, e); + } + }); + + partitionToAppendedFiles.forEach((partition, appendedFileMap) -> { + fileChangeCount[1] += appendedFileMap.size(); + + // Validate that no appended file has been deleted + ValidationUtils.checkState( + !appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())), + "Rollback file cannot both be appended and deleted"); + + // New files added to a partition + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(appendedFileMap), + Option.empty()); + records.add(record); + }); + + LOG.info("Updating at " + instantTime + " from " + operation + ". #partitions_updated=" + records.size() + + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + fileChangeCount[1]); + commit(prepRecords(records, METADATA_PARTITION_NAME), instantTime, true); + } + + /** + * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit. + * + * @param recordRDD The records to commit + * @param instantTime The timestamp of instant to create + * @param performUpdate If True, records are upserted. If False, records are inserted + */ + private synchronized void commit(JavaRDD<HoodieRecord> recordRDD, String instantTime, boolean performUpdate) { + ValidationUtils.checkState(!readOnly, "Metadata table cannot be committed in readonly mode"); + //ValidationUtils.checkArgument(!recordRDD.isEmpty()); + + // Close all readers + if (basefileReader != null) { + basefileReader.close(); + basefileReader = null; + } + logRecordScanner = null; + + try (HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config, true)) { + writeClient.startCommitWithTime(instantTime); + List<WriteStatus> statuses; + if (performUpdate) { + statuses = writeClient.upsertPreppedRecords(recordRDD, instantTime).collect(); + } else { + statuses = writeClient.insertPreppedRecords(recordRDD, instantTime).collect(); + } + statuses.forEach(writeStatus -> { + if (writeStatus.hasErrors()) { + throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime); + } + }); + } + } + + /** + * Tag each record with the location. + * + * Since we only read the latest base file in a partition, we tag the records with the instant time of the latest + * base file. + */ + private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName) { + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); + SliceView fsView = table.getSliceView(); + List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName) + .map(s -> s.getBaseFile()) + .filter(b -> b.isPresent()) + .map(b -> b.get()) + .collect(Collectors.toList()); + + // All the metadata fits within a single base file + if (partitionName.equals(METADATA_PARTITION_NAME)) { + if (baseFiles.size() > 1) { + throw new HoodieMetadataException("Multiple base files found in metadata partition"); + } + } + + String fileId; + String instantTime; + if (!baseFiles.isEmpty()) { + fileId = baseFiles.get(0).getFileId(); + instantTime = baseFiles.get(0).getCommitTime(); + } else { + // If there is a log file then we can assume that it has the data + List<HoodieLogFile> logFiles = fsView.getLatestFileSlices(HoodieMetadataImpl.METADATA_PARTITION_NAME) + .map(s -> s.getLatestLogFile()) + .filter(b -> b.isPresent()) + .map(b -> b.get()) + .collect(Collectors.toList()); + if (logFiles.isEmpty()) { + // No base and log files. All are new inserts + return jsc.parallelize(records); + } + + fileId = logFiles.get(0).getFileId(); + instantTime = logFiles.get(0).getBaseCommitTime(); + } + + return jsc.parallelize(records).map(r -> r.setCurrentLocation(new HoodieRecordLocation(instantTime, fileId))); + } + + /** + * Returns a list of all partitions. + */ + List<String> getAllPartitionPaths() throws IOException { + final Timer.Context context = this.metrics.getMetadataCtx(LOOKUP_PARTITIONS_STR); Review comment: Initialized a HoodieMetrics object with metrics disabled for the read-only case. This should keep the code simple by not requiring if checks everywhere this.metrics is used. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org