prashantwason commented on a change in pull request #2064: URL: https://github.com/apache/hudi/pull/2064#discussion_r497074074
########## File path: hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataImpl.java ########## @@ -0,0 +1,1064 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.ClientUtils; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; +import org.apache.hudi.common.util.CleanerUtils; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.SpillableMapUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieMetricsConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.metrics.HoodieMetrics; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import com.codahale.metrics.Timer; + +import scala.Tuple2; + +/** + * Metadata implementation which saves partition and file listing within an internal MOR table + * called Metadata Table. This table is created by listing files and partitions (first time) and kept in sync + * using the instants on the main dataset. + */ +public class HoodieMetadataImpl { + private static final Logger LOG = LogManager.getLogger(HoodieMetadataImpl.class); + + // Table name suffix + private static final String METADATA_TABLE_NAME_SUFFIX = "_metadata"; + // Timestamp for a commit when the base dataset had not had any commits yet. + private static final String SOLO_COMMIT_TIMESTAMP = "00000000000000"; + + // Name of partition which saves file listings + public static final String METADATA_PARTITION_NAME = "metadata_partition"; + // List of all partitions + public static final String[] METADATA_ALL_PARTITIONS = {METADATA_PARTITION_NAME}; + // Key for the record which saves list of all partitions + static final String RECORDKEY_PARTITION_LIST = "__all_partitions__"; + + // Metric names + private static final String INITIALIZE_STR = "initialize"; + private static final String SYNC_STR = "sync"; + private static final String LOOKUP_PARTITIONS_STR = "lookup_partitions"; + private static final String LOOKUP_FILES_STR = "lookup_files"; + private static final String VALIDATE_PARTITIONS_STR = "validate_partitions"; + private static final String VALIDATE_FILES_STR = "validate_files"; + private static final String VALIDATE_ERRORS_STR = "validate_errors"; + private static final String SCAN_STR = "scan"; + + // Stats names + private static final String STAT_TOTAL_BASE_FILE_SIZE = "totalBaseFileSizeInBytes"; + private static final String STAT_TOTAL_LOG_FILE_SIZE = "totalLogFileSizeInBytes"; + private static final String STAT_COUNT_BASE_FILES = "baseFileCount"; + private static final String STAT_COUNT_LOG_FILES = "logFileCount"; + private static final String STAT_COUNT_PARTITION = "partitionCount"; + private static final String STAT_IN_SYNC = "isInSync"; + private static final String STAT_LAST_COMPACTION_TIMESTAMP = "lastCompactionTimestamp"; + + private final JavaSparkContext jsc; + private final Configuration hadoopConf; + private final String datasetBasePath; + private final String metadataBasePath; + private final HoodieWriteConfig config; + private final String tableName; + private final Schema schema; + private long maxMemorySizeInBytes = 1024 * 1024 * 1024; // TODO + private int bufferSize = 10 * 1024 * 1024; // TODO + private HoodieTableMetaClient metaClient; + private HoodieMetrics metrics; + private boolean validateLookups = false; + private int numLookupsAllPartitions = 0; + private int numLookupsFilesInPartition = 0; + private int numScans = 0; + private int numValidationErrors = 0; + // In read only mode, no changes are made to the metadata table + private boolean readOnly = true; + + // Readers for the base and log file which store the metadata + private HoodieFileReader<GenericRecord> basefileReader; + private HoodieMetadataMergedLogRecordScanner logRecordScanner; + + HoodieMetadataImpl(JavaSparkContext jsc, Configuration hadoopConf, String datasetBasePath, String metadataBasePath, + HoodieWriteConfig writeConfig, boolean readOnly) throws IOException { + this.jsc = jsc; + this.hadoopConf = hadoopConf; + this.datasetBasePath = datasetBasePath; + this.metadataBasePath = metadataBasePath; + this.readOnly = readOnly; + + this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX; + this.validateLookups = writeConfig.getFileListingMetadataVerify(); + + // Load the schema + String schemaStr; + try { + schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/metadataSchema.txt")); + this.schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); + } catch (IOException e) { + throw new HoodieMetadataException("Failed to read metadata schema", e); + } + + if (readOnly) { + this.config = null; + metaClient = new HoodieTableMetaClient(hadoopConf, metadataBasePath); + } else { + this.config = createMetadataWriteConfig(writeConfig, schemaStr); + this.metrics = new HoodieMetrics(this.config, this.config.getTableName()); + + // Inline compaction and auto clean is required as we dont expose this table outside + ValidationUtils.checkArgument(this.config.isAutoClean(), "Auto clean is required for Metadata Compaction config"); + ValidationUtils.checkArgument(this.config.isInlineCompaction(), "Inline compaction is required for Metadata Compaction config"); + // Metadata Table cannot have its metadata optimized + ValidationUtils.checkArgument(this.config.shouldAutoCommit(), "Auto commit is required for Metadata Table"); + ValidationUtils.checkArgument(!this.config.useFileListingMetadata(), "File listing cannot be used for Metadata Table"); + + initAndSyncMetadataTable(); + } + } + + /** + * Create a {@code HoodieWriteConfig} to use for the Metadata Table. + * + * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer + * @param schemaStr Metadata Table schema + */ + private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfig, String schemaStr) throws IOException { + // Create the write config for the metadata table by borrowing options from the main write config. + HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() + .withTimelineLayoutVersion(writeConfig.getTimelineLayoutVersion()) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder() + .withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled()) + .withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs()) + .withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs()) + .withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks()) + .build()) + .withUseFileListingMetadata(false) + .withFileListingMetadataVerify(false) + .withAutoCommit(true) + .withAvroSchemaValidate(true) + .withEmbeddedTimelineServerEnabled(false) + .withAssumeDatePartitioning(false) + .withPath(metadataBasePath) + .withSchema(schemaStr) + .forTable(tableName) + .withParallelism(1, 1).withDeleteParallelism(1).withRollbackParallelism(1).withFinalizeWriteParallelism(1) + .withCompactionConfig(writeConfig.getMetadataCompactionConfig()); + + if (writeConfig.isMetricsOn()) { + HoodieMetricsConfig.Builder metricsConfig = HoodieMetricsConfig.newBuilder() + .withReporterType(writeConfig.getMetricsReporterType().toString()) + .on(true); + switch (writeConfig.getMetricsReporterType()) { + case GRAPHITE: + metricsConfig.onGraphitePort(writeConfig.getGraphiteServerPort()) + .toGraphiteHost(writeConfig.getGraphiteServerHost()) + .usePrefix(writeConfig.getGraphiteMetricPrefix()); + break; + case JMX: + metricsConfig.onJmxPort(writeConfig.getJmxPort()) + .toJmxHost(writeConfig.getJmxHost()); + break; + case DATADOG: + // TODO: + break; + case CONSOLE: + case INMEMORY: + break; + default: + throw new HoodieMetadataException("Unsupported Metrics Reporter type " + writeConfig.getMetricsReporterType()); + } + + builder.withMetricsConfig(metricsConfig.build()); + } + + return builder.build(); + } + + public HoodieWriteConfig getWriteConfig() { + return config; + } + + /** + * Reload the metadata table by syncing it based on the commits on the dataset. + * + * This is only allowed + */ + + public void reload() throws IOException { + initAndSyncMetadataTable(); + } + + /** + * Initialize the metadata table if it does not exist. Update the metadata to bring it in sync with the file system. + * + * This can happen in two ways: + * 1. If the metadata table did not exist, then file and partition listing is used + * 2. If the metadata table exists, the instants from active timeline are read in order and changes applied + * + * The above logic has been chosen because it is faster to perform #1 at scale rather than read all the Instants + * which are large in size (AVRO or JSON encoded and not compressed) and incur considerable IO for de-serialization + * and decoding. + */ + private void initAndSyncMetadataTable() throws IOException { + ValidationUtils.checkState(!readOnly, "Metadata table cannot be initialized in readonly mode"); + + final Timer.Context context = this.metrics.getMetadataCtx(INITIALIZE_STR); + + HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf, datasetBasePath); + FileSystem fs = FSUtils.getFs(metadataBasePath, hadoopConf); + boolean exists = fs.exists(new Path(metadataBasePath, HoodieTableMetaClient.METAFOLDER_NAME)); + + if (!exists) { + // Initialize for the first time by listing partitions and files directly from the file system + initFromFilesystem(datasetMetaClient); + } else { + metaClient = ClientUtils.createMetaClient(hadoopConf, config, true); + } + + /* + // TODO: We may not be able to sync in certain cases (instants archived etc) + //if (!canSync(datasetMetaClient)) { + // Need to recreate the table as sync has failed + // TODO: delete the table + // initFromFilesystem(datasetMetaClient); + //} + */ + + // This is always called even in case the table was created for the first time. This is because + // initFromFilesystem() does file listing and hence may take a long time during which some new updates + // may have occurred on the table. Hence, calling this always ensures that the metadata is brought in sync + // with the active timeline. + syncFromInstants(datasetMetaClient); + + // Publish some metrics + if (context != null) { + long durationInMs = metrics.getDurationInMs(context.stop()); + // Time to initilize and sync + if (exists) { + metrics.updateMetadataMetrics(INITIALIZE_STR, 0, 0); + metrics.updateMetadataMetrics(SYNC_STR, durationInMs, 1); + } else { + metrics.updateMetadataMetrics(INITIALIZE_STR, durationInMs, 1); + metrics.updateMetadataMetrics(SYNC_STR, 0, 0); + } + + // Total size of the metadata and count of base/log files + Map<String, String> stats = getStats(false); + metrics.updateMetadataSizeMetrics(Long.valueOf(stats.get(STAT_TOTAL_BASE_FILE_SIZE)), + Long.valueOf(stats.get(STAT_TOTAL_LOG_FILE_SIZE)), Integer.valueOf(stats.get(STAT_COUNT_BASE_FILES)), + Integer.valueOf(stats.get(STAT_COUNT_LOG_FILES))); + } + } + + /** + * Initialize the Metadata Table by listing files and partitions from the file system. + * + * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset + */ + private void initFromFilesystem(HoodieTableMetaClient datasetMetaClient) throws IOException { + ValidationUtils.checkState(!readOnly, "Metadata table cannot be initialized in readonly mode"); + + // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit + Option<HoodieInstant> latestInstant = datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant(); + String createInstantTime = latestInstant.isPresent() ? latestInstant.get().getTimestamp() : SOLO_COMMIT_TIMESTAMP; + + LOG.info("Creating a new metadata table in " + metadataBasePath + " at instant " + createInstantTime); + metaClient = HoodieTableMetaClient.initTableType(hadoopConf, metadataBasePath.toString(), + HoodieTableType.MERGE_ON_READ, tableName, "archived", HoodieMetadataPayload.class.getName(), + HoodieFileFormat.HFILE.toString()); + + // List all partitions in the basePath of the containing dataset + FileSystem fs = FSUtils.getFs(datasetBasePath, hadoopConf); + List<String> partitions = FSUtils.getAllPartitionPaths(fs, datasetBasePath, false); + LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions"); + + // List all partitions in parallel and collect the files in them + final String dbasePath = datasetBasePath; + final SerializableConfiguration serializedConf = new SerializableConfiguration(hadoopConf); + int parallelism = Math.min(partitions.size(), 100) + 1; // +1 to ensure non zero + JavaPairRDD<String, FileStatus[]> partitionFileListRDD = jsc.parallelize(partitions, parallelism) + .mapToPair(partition -> { + FileSystem fsys = FSUtils.getFs(dbasePath, serializedConf.get()); + FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new Path(dbasePath, partition)); + return new Tuple2<>(partition, statuses); + }); + + // Collect the list of partitions and file lists + List<Tuple2<String, FileStatus[]>> partitionFileList = partitionFileListRDD.collect(); + + // Create a HoodieCommitMetadata with writeStats for all discovered files + int[] stats = {0}; + HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + partitionFileList.forEach(t -> { + final String partition = t._1; + try { + if (!fs.exists(new Path(datasetBasePath, partition + Path.SEPARATOR + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) { + return; + } + } catch (IOException e) { + throw new HoodieMetadataException("Failed to check partition " + partition, e); + } + + // If the partition has no files then create a writeStat with no file path + if (t._2.length == 0) { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPartitionPath(partition); + metadata.addWriteStat(partition, writeStat); + } else { + Arrays.stream(t._2).forEach(status -> { + String filename = status.getPath().getName(); + if (filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { + return; + } + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPath(partition + Path.SEPARATOR + filename); + writeStat.setPartitionPath(partition); + writeStat.setTotalWriteBytes(status.getLen()); + metadata.addWriteStat(partition, writeStat); + }); + } + stats[0] += t._2.length; + }); + + LOG.info("Committing " + partitionFileList.size() + " partitions and " + stats[0] + " files to metadata"); + update(metadata, createInstantTime); + } + + /** + * Sync the Metadata Table from the instants created on the dataset. + * + * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset + */ + private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) throws IOException { + ValidationUtils.checkState(!readOnly, "Metadata table cannot be synced in readonly mode"); + + List<HoodieInstant> instantsToSync = findInstantsToSync(datasetMetaClient); + if (instantsToSync.isEmpty()) { + return; + } + + LOG.info("Syncing " + instantsToSync.size() + " instants to metadata table: " + instantsToSync); + + // Read each instant in order and sync it to metadata table + final HoodieActiveTimeline timeline = datasetMetaClient.getActiveTimeline(); + for (HoodieInstant instant : instantsToSync) { + LOG.info("Syncing instant " + instant + " to metadata table"); + + switch (instant.getAction()) { + case HoodieTimeline.CLEAN_ACTION: { + // CLEAN is synced from the + // - inflight instant which contains the HoodieCleanerPlan, or + // - complete instant which contains the HoodieCleanMetadata + try { + HoodieInstant inflightCleanInstant = new HoodieInstant(true, instant.getAction(), instant.getTimestamp()); + ValidationUtils.checkArgument(inflightCleanInstant.isInflight()); + HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(datasetMetaClient, inflightCleanInstant); + update(cleanerPlan, instant.getTimestamp()); + } catch (HoodieIOException e) { + HoodieInstant cleanInstant = new HoodieInstant(false, instant.getAction(), instant.getTimestamp()); + ValidationUtils.checkArgument(cleanInstant.isCompleted()); + HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, cleanInstant); + update(cleanMetadata, instant.getTimestamp()); + } + break; + } + case HoodieTimeline.DELTA_COMMIT_ACTION: + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.COMPACTION_ACTION: { + ValidationUtils.checkArgument(instant.isCompleted()); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + update(commitMetadata, instant.getTimestamp()); + break; + } + case HoodieTimeline.ROLLBACK_ACTION: { + ValidationUtils.checkArgument(instant.isCompleted()); + HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata( + timeline.getInstantDetails(instant).get()); + update(rollbackMetadata, instant.getTimestamp()); + break; + } + case HoodieTimeline.RESTORE_ACTION: { + ValidationUtils.checkArgument(instant.isCompleted()); + HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata( + timeline.getInstantDetails(instant).get()); + update(restoreMetadata, instant.getTimestamp()); + break; + } + case HoodieTimeline.SAVEPOINT_ACTION: { + ValidationUtils.checkArgument(instant.isCompleted()); + // Nothing to be done here + break; + } + default: { + throw new HoodieException("Unknown type of action " + instant.getAction()); + } + } + } + } + + /** + * Return an ordered list of instants which have not been synced to the Metadata Table. + + * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset + */ + public List<HoodieInstant> findInstantsToSync(HoodieTableMetaClient datasetMetaClient) { + HoodieTimeline metaTimeline = metaClient.reloadActiveTimeline(); + + // All instants since the last time metadata table was compacted are candidates for sync + Option<String> compactionTimestamp = getLatestCompactionTimestamp(); + + // If there has not been any compaction then the first delta commit instant should be the one at which + // the metadata table was created. We should not sync any instants before that creation time. + Option<HoodieInstant> oldestMetaInstant = Option.empty(); + if (!compactionTimestamp.isPresent()) { + oldestMetaInstant = Option.fromJavaOptional(metaTimeline.getInstants() + .filter(i -> i.isCompleted() && i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org