nsivabalan commented on code in PR #13236:
URL: https://github.com/apache/hudi/pull/13236#discussion_r2071196025
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -224,45 +327,100 @@ public JavaRDD<WriteStatus>
bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> pr
initTable(WriteOperationType.BULK_INSERT_PREPPED,
Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED,
table.getMetaClient());
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
getMetadataWriter(instantTime, table.getMetaClient());
HoodieWriteMetadata<HoodieData<WriteStatus>> result =
table.bulkInsertPrepped(context, instantTime, HoodieJavaRDD.of(preppedRecords),
bulkInsertPartitioner);
- HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD =
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+ HoodieData<WriteStatus> allWriteStatus =
maybeStreamWriteToMetadataTable(result, metadataWriterOpt, table, instantTime);
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD =
result.clone(HoodieJavaRDD.getJavaRDD(allWriteStatus));
return postWrite(resultRDD, instantTime, table);
}
@Override
public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, String
instantTime) {
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE,
Option.ofNullable(instantTime));
preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
getMetadataWriter(instantTime, table.getMetaClient());
HoodieWriteMetadata<HoodieData<WriteStatus>> result =
table.delete(context, instantTime, HoodieJavaRDD.of(keys));
- HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD =
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+ HoodieData<WriteStatus> allWriteStatus =
maybeStreamWriteToMetadataTable(result, metadataWriterOpt, table, instantTime);
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD =
result.clone(HoodieJavaRDD.getJavaRDD(allWriteStatus));
return postWrite(resultRDD, instantTime, table);
}
@Override
public JavaRDD<WriteStatus> deletePrepped(JavaRDD<HoodieRecord<T>>
preppedRecord, String instantTime) {
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE_PREPPED,
Option.ofNullable(instantTime));
preWrite(instantTime, WriteOperationType.DELETE_PREPPED,
table.getMetaClient());
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
getMetadataWriter(instantTime, table.getMetaClient());
HoodieWriteMetadata<HoodieData<WriteStatus>> result =
table.deletePrepped(context,instantTime, HoodieJavaRDD.of(preppedRecord));
- HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD =
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+ HoodieData<WriteStatus> allWriteStatus =
maybeStreamWriteToMetadataTable(result, metadataWriterOpt, table, instantTime);
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD =
result.clone(HoodieJavaRDD.getJavaRDD(allWriteStatus));
return postWrite(resultRDD, instantTime, table);
}
public HoodieWriteResult deletePartitions(List<String> partitions, String
instantTime) {
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE_PARTITION,
Option.ofNullable(instantTime));
preWrite(instantTime, WriteOperationType.DELETE_PARTITION,
table.getMetaClient());
HoodieWriteMetadata<HoodieData<WriteStatus>> result =
table.deletePartitions(context, instantTime, partitions);
- HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD =
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD =
result.clone(HoodieJavaRDD.getJavaRDD(result.getDataTableWriteStatuses()));
return new HoodieWriteResult(postWrite(resultRDD, instantTime, table),
result.getPartitionToReplaceFileIds());
}
public HoodieWriteResult managePartitionTTL(String instantTime) {
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE_PARTITION,
Option.ofNullable(instantTime));
preWrite(instantTime, WriteOperationType.DELETE_PARTITION,
table.getMetaClient());
HoodieWriteMetadata<HoodieData<WriteStatus>> result =
table.managePartitionTTL(context, instantTime);
- HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD =
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD =
result.clone(HoodieJavaRDD.getJavaRDD(result.getDataTableWriteStatuses()));
return new HoodieWriteResult(postWrite(resultRDD, instantTime, table),
result.getPartitionToReplaceFileIds());
}
+ /**
+ * Creates a {@link HoodieTableMetadataWriter} instance to assist with
writing to metadata table.
+ * @param triggeringInstantTimestamp instant time of interest.
+ * @param metaClient data table's {@link HoodieTableMetaClient} instance.
+ * @return Option of {@link HoodieTableMetadataWriter} if we could
instantiate.
+ */
+ protected Option<HoodieTableMetadataWriter> getMetadataWriter(String
triggeringInstantTimestamp, HoodieTableMetaClient metaClient) {
Review Comment:
Getting or creating new HoodieTableMetadataWriter instance for a given
instant of interest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]