prashantwason commented on a change in pull request #2064:
URL: https://github.com/apache/hudi/pull/2064#discussion_r497079467



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataImpl.java
##########
@@ -0,0 +1,1064 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.ClientUtils;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SpillableMapUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieMetricsConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import com.codahale.metrics.Timer;
+
+import scala.Tuple2;
+
+/**
+ * Metadata implementation which saves partition and file listing within an 
internal MOR table
+ * called Metadata Table. This table is created by listing files and 
partitions (first time) and kept in sync
+ * using the instants on the main dataset.
+ */
+public class HoodieMetadataImpl {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetadataImpl.class);
+
+  // Table name suffix
+  private static final String METADATA_TABLE_NAME_SUFFIX = "_metadata";
+  // Timestamp for a commit when the base dataset had not had any commits yet.
+  private static final String SOLO_COMMIT_TIMESTAMP = "00000000000000";
+
+  // Name of partition which saves file listings
+  public static final String METADATA_PARTITION_NAME = "metadata_partition";
+  // List of all partitions
+  public static final String[] METADATA_ALL_PARTITIONS = 
{METADATA_PARTITION_NAME};
+  // Key for the record which saves list of all partitions
+  static final String RECORDKEY_PARTITION_LIST = "__all_partitions__";
+
+  // Metric names
+  private static final String INITIALIZE_STR = "initialize";
+  private static final String SYNC_STR = "sync";
+  private static final String LOOKUP_PARTITIONS_STR = "lookup_partitions";
+  private static final String LOOKUP_FILES_STR = "lookup_files";
+  private static final String VALIDATE_PARTITIONS_STR = "validate_partitions";
+  private static final String VALIDATE_FILES_STR = "validate_files";
+  private static final String VALIDATE_ERRORS_STR = "validate_errors";
+  private static final String SCAN_STR = "scan";
+
+  // Stats names
+  private static final String STAT_TOTAL_BASE_FILE_SIZE = 
"totalBaseFileSizeInBytes";
+  private static final String STAT_TOTAL_LOG_FILE_SIZE = 
"totalLogFileSizeInBytes";
+  private static final String STAT_COUNT_BASE_FILES = "baseFileCount";
+  private static final String STAT_COUNT_LOG_FILES = "logFileCount";
+  private static final String STAT_COUNT_PARTITION = "partitionCount";
+  private static final String STAT_IN_SYNC = "isInSync";
+  private static final String STAT_LAST_COMPACTION_TIMESTAMP = 
"lastCompactionTimestamp";
+
+  private final JavaSparkContext jsc;
+  private final Configuration hadoopConf;
+  private final String datasetBasePath;
+  private final String metadataBasePath;
+  private final HoodieWriteConfig config;
+  private final String tableName;
+  private final Schema schema;
+  private long maxMemorySizeInBytes = 1024 * 1024 * 1024; // TODO
+  private int bufferSize = 10 * 1024 * 1024; // TODO
+  private HoodieTableMetaClient metaClient;
+  private HoodieMetrics metrics;
+  private boolean validateLookups = false;
+  private int numLookupsAllPartitions = 0;
+  private int numLookupsFilesInPartition = 0;
+  private int numScans = 0;
+  private int numValidationErrors = 0;
+  // In read only mode, no changes are made to the metadata table
+  private boolean readOnly = true;
+
+  // Readers for the base and log file which store the metadata
+  private HoodieFileReader<GenericRecord> basefileReader;
+  private HoodieMetadataMergedLogRecordScanner logRecordScanner;
+
+  HoodieMetadataImpl(JavaSparkContext jsc, Configuration hadoopConf, String 
datasetBasePath, String metadataBasePath,
+                         HoodieWriteConfig writeConfig, boolean readOnly) 
throws IOException {
+    this.jsc = jsc;
+    this.hadoopConf = hadoopConf;
+    this.datasetBasePath = datasetBasePath;
+    this.metadataBasePath = metadataBasePath;
+    this.readOnly = readOnly;
+
+    this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
+    this.validateLookups = writeConfig.getFileListingMetadataVerify();
+
+    // Load the schema
+    String schemaStr;
+    try {
+      schemaStr = 
FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/metadataSchema.txt"));
+      this.schema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(schemaStr));
+    } catch (IOException e) {
+      throw new HoodieMetadataException("Failed to read metadata schema", e);
+    }
+
+    if (readOnly) {
+      this.config = null;
+      metaClient = new HoodieTableMetaClient(hadoopConf, metadataBasePath);
+    } else {
+      this.config = createMetadataWriteConfig(writeConfig, schemaStr);
+      this.metrics = new HoodieMetrics(this.config, 
this.config.getTableName());
+
+      // Inline compaction and auto clean is required as we dont expose this 
table outside
+      ValidationUtils.checkArgument(this.config.isAutoClean(), "Auto clean is 
required for Metadata Compaction config");
+      ValidationUtils.checkArgument(this.config.isInlineCompaction(), "Inline 
compaction is required for Metadata Compaction config");
+      // Metadata Table cannot have its metadata optimized
+      ValidationUtils.checkArgument(this.config.shouldAutoCommit(), "Auto 
commit is required for Metadata Table");
+      ValidationUtils.checkArgument(!this.config.useFileListingMetadata(), 
"File listing cannot be used for Metadata Table");
+
+      initAndSyncMetadataTable();
+    }
+  }
+
+  /**
+   * Create a {@code HoodieWriteConfig} to use for the Metadata Table.
+   *
+   * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
+   * @param schemaStr Metadata Table schema
+   */
+  private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig 
writeConfig, String schemaStr) throws IOException {
+    // Create the write config for the metadata table by borrowing options 
from the main write config.
+    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+        .withTimelineLayoutVersion(writeConfig.getTimelineLayoutVersion())
+        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
+            
.withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled())
+            
.withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs())
+            
.withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs())
+            
.withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
+            .build())
+        .withUseFileListingMetadata(false)
+        .withFileListingMetadataVerify(false)
+        .withAutoCommit(true)
+        .withAvroSchemaValidate(true)
+        .withEmbeddedTimelineServerEnabled(false)
+        .withAssumeDatePartitioning(false)
+        .withPath(metadataBasePath)
+        .withSchema(schemaStr)
+        .forTable(tableName)
+        .withParallelism(1, 
1).withDeleteParallelism(1).withRollbackParallelism(1).withFinalizeWriteParallelism(1)
+        .withCompactionConfig(writeConfig.getMetadataCompactionConfig());
+
+    if (writeConfig.isMetricsOn()) {
+      HoodieMetricsConfig.Builder metricsConfig = 
HoodieMetricsConfig.newBuilder()
+          .withReporterType(writeConfig.getMetricsReporterType().toString())
+          .on(true);
+      switch (writeConfig.getMetricsReporterType()) {
+        case GRAPHITE:
+          metricsConfig.onGraphitePort(writeConfig.getGraphiteServerPort())
+                       .toGraphiteHost(writeConfig.getGraphiteServerHost())
+                       .usePrefix(writeConfig.getGraphiteMetricPrefix());
+          break;
+        case JMX:
+          metricsConfig.onJmxPort(writeConfig.getJmxPort())
+                       .toJmxHost(writeConfig.getJmxHost());
+          break;
+        case DATADOG:
+          // TODO:
+          break;
+        case CONSOLE:
+        case INMEMORY:
+          break;
+        default:
+          throw new HoodieMetadataException("Unsupported Metrics Reporter type 
" + writeConfig.getMetricsReporterType());
+      }
+
+      builder.withMetricsConfig(metricsConfig.build());
+    }
+
+    return builder.build();
+  }
+
+  public HoodieWriteConfig getWriteConfig() {
+    return config;
+  }
+
+  /**
+   * Reload the metadata table by syncing it based on the commits on the 
dataset.
+   *
+   * This is only allowed
+   */
+
+  public void reload() throws IOException {
+    initAndSyncMetadataTable();
+  }
+
+  /**
+   * Initialize the metadata table if it does not exist. Update the metadata 
to bring it in sync with the file system.
+   *
+   * This can happen in two ways:
+   * 1. If the metadata table did not exist, then file and partition listing 
is used
+   * 2. If the metadata table exists, the instants from active timeline are 
read in order and changes applied
+   *
+   * The above logic has been chosen because it is faster to perform #1 at 
scale rather than read all the Instants
+   * which are large in size (AVRO or JSON encoded and not compressed) and 
incur considerable IO for de-serialization
+   * and decoding.
+   */
+  private void initAndSyncMetadataTable() throws IOException {
+    ValidationUtils.checkState(!readOnly, "Metadata table cannot be 
initialized in readonly mode");
+
+    final Timer.Context context = this.metrics.getMetadataCtx(INITIALIZE_STR);
+
+    HoodieTableMetaClient datasetMetaClient = new 
HoodieTableMetaClient(hadoopConf, datasetBasePath);
+    FileSystem fs = FSUtils.getFs(metadataBasePath, hadoopConf);
+    boolean exists = fs.exists(new Path(metadataBasePath, 
HoodieTableMetaClient.METAFOLDER_NAME));
+
+    if (!exists) {
+      // Initialize for the first time by listing partitions and files 
directly from the file system
+      initFromFilesystem(datasetMetaClient);
+    } else {
+      metaClient = ClientUtils.createMetaClient(hadoopConf, config, true);
+    }
+
+    /*
+    // TODO: We may not be able to sync in certain cases (instants archived 
etc)
+    //if (!canSync(datasetMetaClient)) {
+      // Need to recreate the table as sync has failed
+      // TODO: delete the table
+    //  initFromFilesystem(datasetMetaClient);
+    //}
+    */
+
+    // This is always called even in case the table was created for the first 
time. This is because
+    // initFromFilesystem() does file listing and hence may take a long time 
during which some new updates
+    // may have occurred on the table. Hence, calling this always ensures that 
the metadata is brought in sync
+    // with the active timeline.
+    syncFromInstants(datasetMetaClient);
+
+    // Publish some metrics
+    if (context != null) {
+      long durationInMs = metrics.getDurationInMs(context.stop());
+      // Time to initilize and sync
+      if (exists) {
+        metrics.updateMetadataMetrics(INITIALIZE_STR, 0, 0);
+        metrics.updateMetadataMetrics(SYNC_STR, durationInMs, 1);
+      } else {
+        metrics.updateMetadataMetrics(INITIALIZE_STR, durationInMs, 1);
+        metrics.updateMetadataMetrics(SYNC_STR, 0, 0);
+      }
+
+      // Total size of the metadata and count of base/log files
+      Map<String, String> stats = getStats(false);
+      
metrics.updateMetadataSizeMetrics(Long.valueOf(stats.get(STAT_TOTAL_BASE_FILE_SIZE)),
+          Long.valueOf(stats.get(STAT_TOTAL_LOG_FILE_SIZE)), 
Integer.valueOf(stats.get(STAT_COUNT_BASE_FILES)),
+          Integer.valueOf(stats.get(STAT_COUNT_LOG_FILES)));
+    }
+  }
+
+  /**
+   * Initialize the Metadata Table by listing files and partitions from the 
file system.
+   *
+   * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+   */
+  private void initFromFilesystem(HoodieTableMetaClient datasetMetaClient) 
throws IOException {
+    ValidationUtils.checkState(!readOnly, "Metadata table cannot be 
initialized in readonly mode");
+
+    // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP 
as the instant time for initial commit
+    Option<HoodieInstant> latestInstant = 
datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+    String createInstantTime = latestInstant.isPresent() ? 
latestInstant.get().getTimestamp() : SOLO_COMMIT_TIMESTAMP;
+
+    LOG.info("Creating a new metadata table in " + metadataBasePath + " at 
instant " + createInstantTime);
+    metaClient = HoodieTableMetaClient.initTableType(hadoopConf, 
metadataBasePath.toString(),
+        HoodieTableType.MERGE_ON_READ, tableName, "archived", 
HoodieMetadataPayload.class.getName(),
+        HoodieFileFormat.HFILE.toString());
+
+    // List all partitions in the basePath of the containing dataset
+    FileSystem fs = FSUtils.getFs(datasetBasePath, hadoopConf);
+    List<String> partitions = FSUtils.getAllPartitionPaths(fs, 
datasetBasePath, false);
+    LOG.info("Initializing metadata table by using file listings in " + 
partitions.size() + " partitions");
+
+    // List all partitions in parallel and collect the files in them
+    final String dbasePath = datasetBasePath;
+    final SerializableConfiguration serializedConf = new 
SerializableConfiguration(hadoopConf);
+    int parallelism =  Math.min(partitions.size(), 100) + 1; // +1 to ensure 
non zero
+    JavaPairRDD<String, FileStatus[]> partitionFileListRDD = 
jsc.parallelize(partitions, parallelism)
+        .mapToPair(partition -> {
+          FileSystem fsys = FSUtils.getFs(dbasePath, serializedConf.get());
+          FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new 
Path(dbasePath, partition));
+          return new Tuple2<>(partition, statuses);
+        });
+
+    // Collect the list of partitions and file lists
+    List<Tuple2<String, FileStatus[]>> partitionFileList = 
partitionFileListRDD.collect();
+
+    // Create a HoodieCommitMetadata with writeStats for all discovered files
+    int[] stats = {0};
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    partitionFileList.forEach(t -> {
+      final String partition = t._1;
+      try {
+        if (!fs.exists(new Path(datasetBasePath, partition + Path.SEPARATOR + 
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) {
+          return;
+        }
+      } catch (IOException e) {
+        throw new HoodieMetadataException("Failed to check partition " + 
partition, e);
+      }
+
+      // If the partition has no files then create a writeStat with no file 
path
+      if (t._2.length == 0) {
+        HoodieWriteStat writeStat = new HoodieWriteStat();
+        writeStat.setPartitionPath(partition);
+        metadata.addWriteStat(partition, writeStat);
+      } else {
+        Arrays.stream(t._2).forEach(status -> {
+          String filename = status.getPath().getName();
+          if 
(filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
+            return;
+          }
+          HoodieWriteStat writeStat = new HoodieWriteStat();
+          writeStat.setPath(partition + Path.SEPARATOR + filename);
+          writeStat.setPartitionPath(partition);
+          writeStat.setTotalWriteBytes(status.getLen());
+          metadata.addWriteStat(partition, writeStat);
+        });
+      }
+      stats[0] += t._2.length;
+    });
+
+    LOG.info("Committing " + partitionFileList.size() + " partitions and " + 
stats[0] + " files to metadata");
+    update(metadata, createInstantTime);
+  }
+
+  /**
+   * Sync the Metadata Table from the instants created on the dataset.
+   *
+   * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+   */
+  private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) 
throws IOException {
+    ValidationUtils.checkState(!readOnly, "Metadata table cannot be synced in 
readonly mode");
+
+    List<HoodieInstant> instantsToSync = findInstantsToSync(datasetMetaClient);
+    if (instantsToSync.isEmpty()) {
+      return;
+    }
+
+    LOG.info("Syncing " + instantsToSync.size() + " instants to metadata 
table: " + instantsToSync);
+
+    // Read each instant in order and sync it to metadata table
+    final HoodieActiveTimeline timeline = 
datasetMetaClient.getActiveTimeline();
+    for (HoodieInstant instant : instantsToSync) {
+      LOG.info("Syncing instant " + instant + " to metadata table");
+
+      switch (instant.getAction()) {
+        case HoodieTimeline.CLEAN_ACTION: {
+          // CLEAN is synced from the
+          // - inflight instant which contains the HoodieCleanerPlan, or
+          // - complete instant which contains the HoodieCleanMetadata
+          try {
+            HoodieInstant inflightCleanInstant = new HoodieInstant(true, 
instant.getAction(), instant.getTimestamp());
+            ValidationUtils.checkArgument(inflightCleanInstant.isInflight());
+            HoodieCleanerPlan cleanerPlan = 
CleanerUtils.getCleanerPlan(datasetMetaClient, inflightCleanInstant);
+            update(cleanerPlan, instant.getTimestamp());
+          } catch (HoodieIOException e) {
+            HoodieInstant cleanInstant = new HoodieInstant(false, 
instant.getAction(), instant.getTimestamp());
+            ValidationUtils.checkArgument(cleanInstant.isCompleted());
+            HoodieCleanMetadata cleanMetadata = 
CleanerUtils.getCleanerMetadata(datasetMetaClient, cleanInstant);
+            update(cleanMetadata, instant.getTimestamp());
+          }
+          break;
+        }
+        case HoodieTimeline.DELTA_COMMIT_ACTION:
+        case HoodieTimeline.COMMIT_ACTION:
+        case HoodieTimeline.COMPACTION_ACTION: {
+          ValidationUtils.checkArgument(instant.isCompleted());
+          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+              timeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
+          update(commitMetadata, instant.getTimestamp());
+          break;
+        }
+        case HoodieTimeline.ROLLBACK_ACTION: {
+          ValidationUtils.checkArgument(instant.isCompleted());
+          HoodieRollbackMetadata rollbackMetadata = 
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
+              timeline.getInstantDetails(instant).get());
+          update(rollbackMetadata, instant.getTimestamp());
+          break;
+        }
+        case HoodieTimeline.RESTORE_ACTION: {
+          ValidationUtils.checkArgument(instant.isCompleted());
+          HoodieRestoreMetadata restoreMetadata = 
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
+              timeline.getInstantDetails(instant).get());
+          update(restoreMetadata, instant.getTimestamp());
+          break;
+        }
+        case HoodieTimeline.SAVEPOINT_ACTION: {
+          ValidationUtils.checkArgument(instant.isCompleted());
+          // Nothing to be done here
+          break;
+        }
+        default: {
+          throw new HoodieException("Unknown type of action " + 
instant.getAction());
+        }
+      }
+    }
+  }
+
+  /**
+   * Return an ordered list of instants which have not been synced to the 
Metadata Table.
+
+   * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+   */
+  public List<HoodieInstant> findInstantsToSync(HoodieTableMetaClient 
datasetMetaClient) {
+    HoodieTimeline metaTimeline = metaClient.reloadActiveTimeline();
+
+    // All instants since the last time metadata table was compacted are 
candidates for sync
+    Option<String> compactionTimestamp = getLatestCompactionTimestamp();
+
+    // If there has not been any compaction then the first delta commit 
instant should be the one at which
+    // the metadata table was created. We should not sync any instants before 
that creation time.
+    Option<HoodieInstant> oldestMetaInstant = Option.empty();
+    if (!compactionTimestamp.isPresent()) {
+      oldestMetaInstant = Option.fromJavaOptional(metaTimeline.getInstants()
+          .filter(i -> i.isCompleted() && 
i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION))
+          .findFirst());
+      if (oldestMetaInstant.isPresent()) {
+        // TODO: Ensure this is the instant at which we created the metadata 
table
+      }
+    }
+
+    String metaSyncTimestamp = compactionTimestamp.isPresent() ? 
compactionTimestamp.get()
+        : oldestMetaInstant.isPresent() ? 
oldestMetaInstant.get().getTimestamp() : "";
+
+    // Metadata table is updated when an instant is completed except for the 
following:
+    //  CLEAN: metadata table is updated during inflight. So for CLEAN we 
accept inflight actions.
+    List<HoodieInstant> datasetInstants = 
datasetMetaClient.getActiveTimeline().getInstants()
+        .filter(i -> i.isCompleted() || 
(i.getAction().equals(HoodieTimeline.CLEAN_ACTION) && i.isInflight()))
+        .filter(i -> metaSyncTimestamp.isEmpty()
+            || HoodieTimeline.compareTimestamps(i.getTimestamp(), 
HoodieTimeline.GREATER_THAN_OR_EQUALS,
+                metaSyncTimestamp))
+        .collect(Collectors.toList());
+
+    // Each operation on dataset leads to a delta-commit on the metadata MOR 
table. So find only delta-commit
+    // instants in metadata table which are after the last compaction.
+    Map<String, HoodieInstant> metadataInstantMap = metaTimeline.getInstants()
+        .filter(i -> i.isCompleted() && 
i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)

Review comment:
       Rewritten using findInstantsAfterOrEquals




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to