This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d4106f35b4aee53ea5cb1430288f397b37c81183 Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Sun Feb 12 03:30:10 2023 -0800 [HUDI-5764] Rollback delta commits from `HoodieIndexer` lazily in metadata table (#7921) Fixes two issues: - Makes the rollback of indexing delta commit lazy in the metadata table, otherwise, it would be cleaned up eagerly by other regular writes. - Uses a suffix (004) appending to the up-to-instant used by the async index to avoid collision with existing completed delta commit of the same instant time. --- .../hudi/client/BaseHoodieTableServiceClient.java | 48 +++++++++ .../apache/hudi/client/BaseHoodieWriteClient.java | 13 +++ .../metadata/HoodieBackedTableMetadataWriter.java | 34 ++++--- .../java/org/apache/hudi/table/HoodieTable.java | 38 +++++++- .../table/action/index/RunIndexActionExecutor.java | 5 +- .../FlinkHoodieBackedTableMetadataWriter.java | 21 +++- .../org/apache/hudi/table/HoodieFlinkTable.java | 12 ++- .../SparkHoodieBackedTableMetadataWriter.java | 20 +++- .../org/apache/hudi/table/HoodieSparkTable.java | 10 +- .../hudi/metadata/HoodieBackedTableMetadata.java | 12 ++- .../hudi/metadata/HoodieTableMetadataUtil.java | 20 ++++ .../apache/hudi/utilities/TestHoodieIndexer.java | 108 +++++++++++++++++++-- 12 files changed, 298 insertions(+), 43 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 390bc4b9714..301ed61bf4e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -48,6 +48,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -71,6 +72,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit; public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient implements RunsTableService { @@ -659,8 +661,41 @@ public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient i return infoMap; } + /** + * Rolls back the failed delta commits corresponding to the indexing action. + * Such delta commits are identified based on the suffix `METADATA_INDEXER_TIME_SUFFIX` ("004"). + * <p> + * TODO(HUDI-5733): This should be cleaned up once the proper fix of rollbacks + * in the metadata table is landed. + * + * @return {@code true} if rollback happens; {@code false} otherwise. + */ + protected boolean rollbackFailedIndexingCommits() { + HoodieTable table = createTable(config, hadoopConf); + List<String> instantsToRollback = getFailedIndexingCommitsToRollback(table.getMetaClient()); + Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient()); + instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); + rollbackFailedWrites(pendingRollbacks); + return !pendingRollbacks.isEmpty(); + } + + protected List<String> getFailedIndexingCommitsToRollback(HoodieTableMetaClient metaClient) { + Stream<HoodieInstant> inflightInstantsStream = metaClient.getCommitsTimeline() + .filter(instant -> !instant.isCompleted() + && isIndexingCommit(instant.getTimestamp())) + .getInstantsAsStream(); + return inflightInstantsStream.filter(instant -> { + try { + return heartbeatClient.isHeartbeatExpired(instant.getTimestamp()); + } catch (IOException io) { + throw new HoodieException("Failed to check heartbeat for instant " + instant, io); + } + }).map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + } + /** * Rollback all failed writes. + * * @return true if rollback was triggered. false otherwise. */ protected Boolean rollbackFailedWrites() { @@ -699,6 +734,19 @@ public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient i Stream<HoodieInstant> inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(metaClient) .getReverseOrderedInstants(); if (cleaningPolicy.isEager()) { + // Metadata table uses eager cleaning policy, but we need to exclude inflight delta commits + // from the async indexer (`HoodieIndexer`). + // TODO(HUDI-5733): This should be cleaned up once the proper fix of rollbacks in the + // metadata table is landed. + if (HoodieTableMetadata.isMetadataTable(metaClient.getBasePathV2().toString())) { + return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> { + if (curInstantTime.isPresent()) { + return !entry.equals(curInstantTime.get()); + } else { + return !isIndexingCommit(entry); + } + }).collect(Collectors.toList()); + } return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> { if (curInstantTime.isPresent()) { return !entry.equals(curInstantTime.get()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 17956479762..10c0db97151 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -1279,8 +1279,21 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient } } + /** + * Rolls back the failed delta commits corresponding to the indexing action. + * <p> + * TODO(HUDI-5733): This should be cleaned up once the proper fix of rollbacks + * in the metadata table is landed. + * + * @return {@code true} if rollback happens; {@code false} otherwise. + */ + public boolean lazyRollbackFailedIndexing() { + return tableServiceClient.rollbackFailedIndexingCommits(); + } + /** * Rollback failed writes if any. + * * @return true if rollback happened. false otherwise. */ public boolean rollbackFailedWrites() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 5e8367e2095..3338872efbb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -93,6 +93,7 @@ import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER; import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED; import static org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant; @@ -100,6 +101,7 @@ import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deseri import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING; import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.METADATA_INDEXER_TIME_SUFFIX; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions; @@ -134,15 +136,17 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta /** * Hudi backed table metadata writer. * - * @param hadoopConf - Hadoop configuration to use for the metadata writer - * @param writeConfig - Writer config - * @param engineContext - Engine context - * @param actionMetadata - Optional action metadata to help decide initialize operations - * @param <T> - Action metadata types extending Avro generated SpecificRecordBase - * @param inflightInstantTimestamp - Timestamp of any instant in progress + * @param hadoopConf Hadoop configuration to use for the metadata writer + * @param writeConfig Writer config + * @param failedWritesCleaningPolicy Cleaning policy on failed writes + * @param engineContext Engine context + * @param actionMetadata Optional action metadata to help decide initialize operations + * @param <T> Action metadata types extending Avro generated SpecificRecordBase + * @param inflightInstantTimestamp Timestamp of any instant in progress */ protected <T extends SpecificRecordBase> HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext engineContext, Option<T> actionMetadata, Option<String> inflightInstantTimestamp) { @@ -154,7 +158,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta if (writeConfig.isMetadataTableEnabled()) { this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX; - this.metadataWriteConfig = createMetadataWriteConfig(writeConfig); + this.metadataWriteConfig = createMetadataWriteConfig(writeConfig, failedWritesCleaningPolicy); enabled = true; // Inline compaction and auto clean is required as we dont expose this table outside @@ -181,7 +185,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta public HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) { - this(hadoopConf, writeConfig, engineContext, Option.empty(), Option.empty()); + this(hadoopConf, writeConfig, EAGER, engineContext, Option.empty(), Option.empty()); } /** @@ -232,11 +236,14 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta protected abstract void initRegistry(); /** - * Create a {@code HoodieWriteConfig} to use for the Metadata Table. + * Create a {@code HoodieWriteConfig} to use for the Metadata Table. This is used by async + * indexer only. * - * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer + * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer + * @param failedWritesCleaningPolicy Cleaning policy on failed writes */ - private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfig) { + private HoodieWriteConfig createMetadataWriteConfig( + HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) { int parallelism = writeConfig.getMetadataInsertParallelism(); int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMinCommitsToKeep()); @@ -268,7 +275,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta .withAutoClean(false) .withCleanerParallelism(parallelism) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) + .withFailedWritesCleaningPolicy(failedWritesCleaningPolicy) .retainCommits(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED.defaultValue()) .build()) // we will trigger archive manually, to ensure only regular writer invokes it @@ -875,7 +882,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta inflightIndexes.addAll(indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet())); dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT.key(), String.join(",", inflightIndexes)); HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps()); - initialCommit(indexUptoInstantTime, partitionTypes); + initialCommit(indexUptoInstantTime + METADATA_INDEXER_TIME_SUFFIX, partitionTypes); } /** @@ -1069,6 +1076,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta // delta commits synced over will not have an instant time lesser than the last completed instant on the // metadata table. writeClient.clean(instantTime + "002"); + writeClient.lazyRollbackFailedIndexing(); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 591ebc430dc..4102515ae01 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -41,6 +41,7 @@ import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FailSafeConsistencyGuard; import org.apache.hudi.common.fs.OptimisticConsistencyGuard; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieTableType; @@ -100,6 +101,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.avro.AvroSchemaUtils.isSchemaCompatible; +import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER; +import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.LAZY; import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS; import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition; @@ -872,7 +875,8 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { * @return instance of {@link HoodieTableMetadataWriter} */ public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp) { - return getMetadataWriter(triggeringInstantTimestamp, Option.empty()); + return getMetadataWriter( + triggeringInstantTimestamp, EAGER, Option.empty()); } /** @@ -895,6 +899,29 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { } } + /** + * Gets the metadata writer for async indexer. + * + * @param triggeringInstantTimestamp The instant that is triggering this metadata write. + * @return An instance of {@link HoodieTableMetadataWriter}. + */ + public Option<HoodieTableMetadataWriter> getIndexingMetadataWriter(String triggeringInstantTimestamp) { + return getMetadataWriter(triggeringInstantTimestamp, LAZY, Option.empty()); + } + + /** + * Gets the metadata writer for regular writes. + * + * @param triggeringInstantTimestamp The instant that is triggering this metadata write. + * @param actionMetadata Optional action metadata. + * @param <R> Action metadata type. + * @return An instance of {@link HoodieTableMetadataWriter}. + */ + public <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter( + String triggeringInstantTimestamp, Option<R> actionMetadata) { + return getMetadataWriter(triggeringInstantTimestamp, EAGER, actionMetadata); + } + /** * Get Table metadata writer. * <p> @@ -905,11 +932,14 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { * are blocked from doing the similar initial metadata table creation and * the bootstrapping. * - * @param triggeringInstantTimestamp - The instant that is triggering this metadata write + * @param triggeringInstantTimestamp The instant that is triggering this metadata write + * @param failedWritesCleaningPolicy Cleaning policy on failed writes * @return instance of {@link HoodieTableMetadataWriter} */ - public <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp, - Option<R> actionMetadata) { + protected <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter( + String triggeringInstantTimestamp, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, + Option<R> actionMetadata) { // Each engine is expected to override this and // provide the actual metadata writer, if enabled. return Option.empty(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java index 2fcbfb2b2e5..19aab3629d5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java @@ -138,8 +138,9 @@ public class RunIndexActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, List<HoodieIndexPartitionInfo> finalIndexPartitionInfos = null; if (!firstTimeInitializingMetadataTable) { // start indexing for each partition - HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime) - .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime))); + HoodieTableMetadataWriter metadataWriter = table.getIndexingMetadataWriter(instantTime) + .orElseThrow(() -> new HoodieIndexException(String.format( + "Could not get metadata writer to run index action for instant: %s", instantTime))); // this will only build index upto base instant as generated by the plan, we will be doing catchup later String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant(); LOG.info("Starting Index Building with base instant: " + indexUptoInstant); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index aa70f5835c8..cd45685e13e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.metrics.Registry; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -42,6 +43,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER; + /** * Flink hoodie backed table metadata writer. */ @@ -58,23 +61,35 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad HoodieWriteConfig writeConfig, HoodieEngineContext context, Option<T> actionMetadata) { - return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, Option.empty()); + return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, EAGER, context, actionMetadata, Option.empty()); + } + + public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf, + HoodieWriteConfig writeConfig, + HoodieEngineContext context, + Option<T> actionMetadata, + Option<String> inFlightInstantTimestamp) { + return new FlinkHoodieBackedTableMetadataWriter( + conf, writeConfig, EAGER, context, actionMetadata, inFlightInstantTimestamp); } public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext context, Option<T> actionMetadata, Option<String> inFlightInstantTimestamp) { - return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, inFlightInstantTimestamp); + return new FlinkHoodieBackedTableMetadataWriter( + conf, writeConfig, failedWritesCleaningPolicy, context, actionMetadata, inFlightInstantTimestamp); } <T extends SpecificRecordBase> FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext engineContext, Option<T> actionMetadata, Option<String> inFlightInstantTimestamp) { - super(hadoopConf, writeConfig, engineContext, actionMetadata, inFlightInstantTimestamp); + super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, actionMetadata, inFlightInstantTimestamp); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index 3d77844df6f..422fe310b0c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -98,11 +99,14 @@ public abstract class HoodieFlinkTable<T> * @return instance of {@link HoodieTableMetadataWriter} */ @Override - public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp, - Option<T> actionMetadata) { + protected <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter( + String triggeringInstantTimestamp, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, + Option<T> actionMetadata) { if (config.isMetadataTableEnabled()) { - return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, - context, actionMetadata, Option.of(triggeringInstantTimestamp))); + return Option.of(FlinkHoodieBackedTableMetadataWriter.create( + context.getHadoopConf().get(), config, failedWritesCleaningPolicy, context, + actionMetadata, Option.of(triggeringInstantTimestamp))); } else { return Option.empty(); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 23537f6f798..37222c8266a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -24,6 +24,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.metrics.Registry; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; @@ -49,6 +50,8 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER; + public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter { private static final Logger LOG = LogManager.getLogger(SparkHoodieBackedTableMetadataWriter.class); @@ -73,10 +76,20 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad HoodieEngineContext context, Option<T> actionMetadata, Option<String> inflightInstantTimestamp) { - return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, - inflightInstantTimestamp); + return new SparkHoodieBackedTableMetadataWriter( + conf, writeConfig, EAGER, context, actionMetadata, inflightInstantTimestamp); } + public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf, + HoodieWriteConfig writeConfig, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, + HoodieEngineContext context, + Option<T> actionMetadata, + Option<String> inflightInstantTimestamp) { + return new SparkHoodieBackedTableMetadataWriter( + conf, writeConfig, failedWritesCleaningPolicy, context, actionMetadata, inflightInstantTimestamp); + } + public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) { return create(conf, writeConfig, context, Option.empty(), Option.empty()); @@ -84,10 +97,11 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad <T extends SpecificRecordBase> SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext engineContext, Option<T> actionMetadata, Option<String> inflightInstantTimestamp) { - super(hadoopConf, writeConfig, engineContext, actionMetadata, inflightInstantTimestamp); + super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, actionMetadata, inflightInstantTimestamp); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index 112deccf8df..d6796a7a4d4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -21,6 +21,7 @@ package org.apache.hudi.table; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -89,14 +90,17 @@ public abstract class HoodieSparkTable<T> * @return instance of {@link HoodieTableMetadataWriter} */ @Override - public <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp, - Option<R> actionMetadata) { + protected <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter( + String triggeringInstantTimestamp, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, + Option<R> actionMetadata) { if (config.isMetadataTableEnabled()) { // Create the metadata table writer. First time after the upgrade this creation might trigger // metadata table bootstrapping. Bootstrapping process could fail and checking the table // existence after the creation is needed. final HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create( - context.getHadoopConf().get(), config, context, actionMetadata, Option.of(triggeringInstantTimestamp)); + context.getHadoopConf().get(), config, failedWritesCleaningPolicy, context, + actionMetadata, Option.of(triggeringInstantTimestamp)); // even with metadata enabled, some index could have been disabled // delete metadata partitions corresponding to such indexes deleteMetadataIndexIfNecessary(); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 1ccc14176a1..f3538127955 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -53,9 +53,9 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.TableNotFoundException; -import org.apache.hudi.util.Transient; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieSeekingFileReader; +import org.apache.hudi.util.Transient; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -83,6 +83,7 @@ import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BL import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileSystemView; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit; /** * Table metadata provided by an internal DFS backed Hudi metadata table. @@ -482,6 +483,15 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { Set<String> validInstantTimestamps = datasetTimeline.filterCompletedInstants().getInstantsAsStream() .map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); + // We should also add completed indexing delta commits in the metadata table, as they do not + // have corresponding completed instant in the data table + validInstantTimestamps.addAll( + metadataMetaClient.getActiveTimeline() + .filter(instant -> instant.isCompleted() && isIndexingCommit(instant.getTimestamp())) + .getInstants().stream() + .map(HoodieInstant::getTimestamp) + .collect(Collectors.toList())); + // For any rollbacks and restores, we cannot neglect the instants that they are rolling back. // The rollback instant should be more recent than the start of the timeline for it to have rolled back any // instant which we have a log block for. diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index d37dbab3d82..81ba4f2a66b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -94,6 +94,7 @@ import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema; import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; import static org.apache.hudi.avro.HoodieAvroUtils.convertValueForSpecificDataTypes; import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema; +import static org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; import static org.apache.hudi.common.util.ValidationUtils.checkState; import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper; @@ -111,6 +112,11 @@ public class HoodieTableMetadataUtil { public static final String PARTITION_NAME_COLUMN_STATS = "column_stats"; public static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters"; + // This suffix used by the delta commits from async indexer (`HoodieIndexer`), + // when the `indexUptoInstantTime` already exists in the metadata table, + // to avoid collision. + public static final String METADATA_INDEXER_TIME_SUFFIX = "004"; + /** * Returns whether the files partition of metadata table is ready for read. * @@ -1380,4 +1386,18 @@ public class HoodieTableMetadataUtil { inflightAndCompletedPartitions.addAll(tableConfig.getMetadataPartitions()); return inflightAndCompletedPartitions; } + + /** + * Checks if a delta commit in metadata table is written by async indexer. + * <p> + * TODO(HUDI-5733): This should be cleaned up once the proper fix of rollbacks in the + * metadata table is landed. + * + * @param instantTime Instant time to check. + * @return {@code true} if from async indexer; {@code false} otherwise. + */ + public static boolean isIndexingCommit(String instantTime) { + return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + METADATA_INDEXER_TIME_SUFFIX.length() + && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX); + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java index ac7b86f4cfa..67504cb957d 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java @@ -21,18 +21,25 @@ package org.apache.hudi.utilities; import org.apache.hudi.avro.model.HoodieIndexCommitMetadata; import org.apache.hudi.avro.model.HoodieIndexPartitionInfo; +import org.apache.hudi.avro.model.HoodieIndexPlan; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; @@ -56,6 +63,9 @@ import java.util.stream.Stream; import static org.apache.hudi.common.table.HoodieTableMetaClient.reload; import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED; +import static org.apache.hudi.config.HoodieWriteConfig.CLIENT_HEARTBEAT_INTERVAL_IN_MS; +import static org.apache.hudi.config.HoodieWriteConfig.CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.METADATA_INDEXER_TIME_SUFFIX; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileSystemView; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists; import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS; @@ -123,7 +133,7 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen String tableName = "indexer_test"; // enable files and bloom_filters on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true); - initializeWriteClient(metadataConfigBuilder.build(), tableName); + upsertToTable(metadataConfigBuilder.build(), tableName); // validate table config assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); @@ -138,7 +148,7 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen String tableName = "indexer_test"; // enable files and bloom_filters on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true); - initializeWriteClient(metadataConfigBuilder.build(), tableName); + upsertToTable(metadataConfigBuilder.build(), tableName); // validate table config assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); @@ -147,6 +157,78 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS}), tableName); } + @Test + public void testIndexerWithWriter() throws IOException { + // Test the case where the indexer is running, i.e., the delta commit in the metadata table + // is inflight, while the regular writer is updating metadata table. + // The delta commit from the indexer should not be rolled back. + String tableName = "indexer_with_writer"; + // Enable files and bloom_filters on the regular write client + HoodieMetadataConfig.Builder metadataConfigBuilder = + getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true); + HoodieMetadataConfig metadataConfig = metadataConfigBuilder.build(); + upsertToTable(metadataConfig, tableName); + + // Validate table config + assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); + assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); + + // Run async indexer, creating a new indexing instant in the data table and a new delta commit + // in the metadata table, with the suffix "004" + scheduleAndExecuteIndexing(COLUMN_STATS, tableName); + + HoodieInstant indexingInstant = metaClient.getActiveTimeline() + .filter(i -> HoodieTimeline.INDEXING_ACTION.equals(i.getAction())) + .getInstants().get(0); + HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan( + metaClient.getActiveTimeline().readIndexPlanAsBytes(indexingInstant).get()); + String indexUptoInstantTime = indexPlan.getIndexPartitionInfos().get(0).getIndexUptoInstant(); + HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata( + context(), metadataConfig, metaClient.getBasePathV2().toString(), + getWriteConfigBuilder(basePath(), tableName).build().getSpillableMapBasePath()); + HoodieTableMetaClient metadataMetaClient = metadata.getMetadataMetaClient(); + String mdtCommitTime = indexUptoInstantTime + METADATA_INDEXER_TIME_SUFFIX; + assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(mdtCommitTime)); + + // Reverts both instants to inflight state, to simulate inflight indexing instants + metaClient.getActiveTimeline().revertToInflight(indexingInstant); + metaClient = reload(metaClient); + + HoodieInstant mdtIndexingCommit = metadataMetaClient.getActiveTimeline() + .filter(i -> i.getTimestamp().equals(mdtCommitTime)) + .getInstants().get(0); + metadataMetaClient.getActiveTimeline().revertToInflight(mdtIndexingCommit); + metadataMetaClient = reload(metadataMetaClient); + // Simulate heartbeats for ongoing write from async indexer in the metadata table + HoodieHeartbeatClient heartbeatClient = new HoodieHeartbeatClient( + metadataMetaClient.getFs(), metadataMetaClient.getBasePathV2().toString(), + CLIENT_HEARTBEAT_INTERVAL_IN_MS.defaultValue().longValue(), + CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES.defaultValue()); + heartbeatClient.start(mdtCommitTime); + + upsertToTable(metadataConfig, tableName); + metaClient = reload(metaClient); + metadataMetaClient = reload(metadataMetaClient); + // The delta commit from async indexer in metadata table should not be rolled back + assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(mdtIndexingCommit.getTimestamp())); + assertTrue(metadataMetaClient.getActiveTimeline().getRollbackTimeline().empty()); + + // Simulate heartbeat timeout + heartbeatClient.stop(mdtCommitTime); + upsertToTable(metadataConfig, tableName); + metaClient = reload(metaClient); + metadataMetaClient = reload(metadataMetaClient); + // The delta commit from async indexer in metadata table should be rolled back now + assertFalse(metadataMetaClient.getActiveTimeline().containsInstant(mdtIndexingCommit.getTimestamp())); + assertEquals(1, metadataMetaClient.getActiveTimeline().getRollbackTimeline().countInstants()); + HoodieInstant rollbackInstant = metadataMetaClient.getActiveTimeline() + .getRollbackTimeline().firstInstant().get(); + HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata( + metadataMetaClient.getActiveTimeline().readRollbackInfoAsBytes(rollbackInstant).get()); + assertEquals(mdtCommitTime, rollbackMetadata.getInstantsRollback() + .stream().findFirst().get().getCommitTime()); + } + private static Stream<Arguments> colStatsFileGroupCountParams() { return Stream.of( Arguments.of(1), @@ -163,7 +245,7 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen String tableName = "indexer_test"; // enable files and bloom_filters on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true); - initializeWriteClient(metadataConfigBuilder.build(), tableName); + upsertToTable(metadataConfigBuilder.build(), tableName); // validate table config assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); @@ -190,7 +272,7 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen String tableName = "indexer_test"; // enable files and bloom_filters on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false); - initializeWriteClient(metadataConfigBuilder.build(), tableName); + upsertToTable(metadataConfigBuilder.build(), tableName); // validate table config assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); @@ -227,12 +309,12 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen assertEquals(partitionFileSlices.size(), HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.defaultValue()); } - private void initializeWriteClient(HoodieMetadataConfig metadataConfig, String tableName) { + private void upsertToTable(HoodieMetadataConfig metadataConfig, String tableName) { HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath(), tableName); HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfig).build(); // do one upsert with synchronous metadata update SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig); - String instant = "0001"; + String instant = HoodieActiveTimeline.createNewInstantTime(); writeClient.startCommitWithTime(instant); List<HoodieRecord> records = DATA_GENERATOR.generateInserts(instant, 100); JavaRDD<WriteStatus> result = writeClient.upsert(jsc().parallelize(records, 1), instant); @@ -240,8 +322,7 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen assertNoWriteErrors(statuses); } - private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTypeToIndex, List<MetadataPartitionType> alreadyCompletedPartitions, List<MetadataPartitionType> nonExistantPartitions, - String tableName) { + private void scheduleAndExecuteIndexing(MetadataPartitionType partitionTypeToIndex, String tableName) { HoodieIndexer.Config config = new HoodieIndexer.Config(); String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath(); config.basePath = basePath(); @@ -258,6 +339,13 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen // validate table config metaClient = reload(metaClient); + } + + private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTypeToIndex, List<MetadataPartitionType> alreadyCompletedPartitions, List<MetadataPartitionType> nonExistantPartitions, + String tableName) { + scheduleAndExecuteIndexing(partitionTypeToIndex, tableName); + + // validate table config Set<String> completedPartitions = metaClient.getTableConfig().getMetadataPartitions(); assertTrue(completedPartitions.contains(partitionTypeToIndex.getPartitionPath())); alreadyCompletedPartitions.forEach(entry -> assertTrue(completedPartitions.contains(entry.getPartitionPath()))); @@ -277,7 +365,7 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build(); // do one upsert with synchronous metadata update SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig); - String instant = "0001"; + String instant = HoodieActiveTimeline.createNewInstantTime(); writeClient.startCommitWithTime(instant); List<HoodieRecord> records = DATA_GENERATOR.generateInserts(instant, 100); JavaRDD<WriteStatus> result = writeClient.upsert(jsc().parallelize(records, 1), instant); @@ -331,7 +419,7 @@ public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implemen HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build(); // do one upsert with synchronous metadata update SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig); - String instant = "0001"; + String instant = HoodieActiveTimeline.createNewInstantTime(); writeClient.startCommitWithTime(instant); List<HoodieRecord> records = DATA_GENERATOR.generateInserts(instant, 100); JavaRDD<WriteStatus> result = writeClient.upsert(jsc().parallelize(records, 1), instant);