nsivabalan commented on code in PR #13236:
URL: https://github.com/apache/hudi/pull/13236#discussion_r2071196025


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -224,45 +327,100 @@ public JavaRDD<WriteStatus> 
bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> pr
         initTable(WriteOperationType.BULK_INSERT_PREPPED, 
Option.ofNullable(instantTime));
     table.validateInsertSchema();
     preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, 
table.getMetaClient());
+    Option<HoodieTableMetadataWriter> metadataWriterOpt = 
getMetadataWriter(instantTime, table.getMetaClient());
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.bulkInsertPrepped(context, instantTime, HoodieJavaRDD.of(preppedRecords), 
bulkInsertPartitioner);
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    HoodieData<WriteStatus> allWriteStatus = 
maybeStreamWriteToMetadataTable(result, metadataWriterOpt, table, instantTime);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(allWriteStatus));
     return postWrite(resultRDD, instantTime, table);
   }
 
   @Override
   public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, String 
instantTime) {
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE, 
Option.ofNullable(instantTime));
     preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
+    Option<HoodieTableMetadataWriter> metadataWriterOpt = 
getMetadataWriter(instantTime, table.getMetaClient());
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.delete(context, instantTime, HoodieJavaRDD.of(keys));
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    HoodieData<WriteStatus> allWriteStatus = 
maybeStreamWriteToMetadataTable(result, metadataWriterOpt, table, instantTime);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(allWriteStatus));
     return postWrite(resultRDD, instantTime, table);
   }
 
   @Override
   public JavaRDD<WriteStatus> deletePrepped(JavaRDD<HoodieRecord<T>> 
preppedRecord, String instantTime) {
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE_PREPPED, 
Option.ofNullable(instantTime));
     preWrite(instantTime, WriteOperationType.DELETE_PREPPED, 
table.getMetaClient());
+    Option<HoodieTableMetadataWriter> metadataWriterOpt = 
getMetadataWriter(instantTime, table.getMetaClient());
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.deletePrepped(context,instantTime, HoodieJavaRDD.of(preppedRecord));
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    HoodieData<WriteStatus> allWriteStatus = 
maybeStreamWriteToMetadataTable(result, metadataWriterOpt, table, instantTime);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(allWriteStatus));
     return postWrite(resultRDD, instantTime, table);
   }
 
   public HoodieWriteResult deletePartitions(List<String> partitions, String 
instantTime) {
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE_PARTITION, 
Option.ofNullable(instantTime));
     preWrite(instantTime, WriteOperationType.DELETE_PARTITION, 
table.getMetaClient());
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.deletePartitions(context, instantTime, partitions);
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getDataTableWriteStatuses()));
     return new HoodieWriteResult(postWrite(resultRDD, instantTime, table), 
result.getPartitionToReplaceFileIds());
   }
 
   public HoodieWriteResult managePartitionTTL(String instantTime) {
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE_PARTITION, 
Option.ofNullable(instantTime));
     preWrite(instantTime, WriteOperationType.DELETE_PARTITION, 
table.getMetaClient());
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.managePartitionTTL(context, instantTime);
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getDataTableWriteStatuses()));
     return new HoodieWriteResult(postWrite(resultRDD, instantTime, table), 
result.getPartitionToReplaceFileIds());
   }
 
+  /**
+   * Creates a {@link HoodieTableMetadataWriter} instance to assist with 
writing to metadata table.
+   * @param triggeringInstantTimestamp instant time of interest.
+   * @param metaClient data table's {@link HoodieTableMetaClient} instance.
+   * @return Option of {@link HoodieTableMetadataWriter} if we could 
instantiate.
+   */
+  protected Option<HoodieTableMetadataWriter> getMetadataWriter(String 
triggeringInstantTimestamp, HoodieTableMetaClient metaClient) {

Review Comment:
   Getting or creating new HoodieTableMetadataWriter instance for a given 
instant of interest. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to