[ 
https://issues.apache.org/jira/browse/HUDI-8471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-8471:
--------------------------------------
    Fix Version/s: 1.0.2

> Unify row writer and non-row writer code paths
> ----------------------------------------------
>
>                 Key: HUDI-8471
>                 URL: https://issues.apache.org/jira/browse/HUDI-8471
>             Project: Apache Hudi
>          Issue Type: Sub-task
>          Components: writer-core
>            Reporter: sivabalan narayanan
>            Assignee: sivabalan narayanan
>            Priority: Critical
>             Fix For: 1.0.1, 1.0.2
>
>
> Row writer uses writeClient in an unconventional ways compared to other 
> operations.
> Typical write operation takes the following flow:
> ```
> 1. 
> WriteClient.upsert
> { Instantiate HoodieTable result = table.upsert() postWrite() return 
> HoodieData<WriteStatus> }
> 2. writeClient.commitStats(return value from (1) i.e 
> HoodieData<WriteStatus>,.... ) which internally will commit the write and 
> then call clean, archive, compaction, clustering etc.
> 1.a 
> HoodieTable.upsert()
> { calls into SparkUpsertCommitActionExecutor.execute() }
> 1.a.i 
> SparkUpsertCommitActionExecutor.execute()
> { return HoodieWriteHelper.newInstance().write(...) }
> 1.a.i.1
> HoodieWriteHelper.newInstance().write()
> { dedup records tagRecords or index lookup return 
> BaseCommitActionExecutor.execute() }
> 1.a.i.1.a
> BaseSparkCommitActionExecutor.execute()
> { build workload profile getPartitioner HoodieData<WriteStatus> writeStatuses 
> = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner); // this 
> is where the writes happen update index and return HoodieData<WriteStatus> }
> ```
> While rowWriter looks like below
> ```
> 1. HoodieSparkSqlWriter.write
> bulkInsertAsRow
> { writeclient.startCommit WriteResult = 
> BaseDatasetBulkInsertCommitActionExecutor.execute() // by the time we return 
> from here, data is committed fully along w/ any inline table services. }
> 1.a BaseDatasetBulkInsertCommitActionExecutor.execute
> { write to custom spark ds }
> 2. Custom Spark DS: 
> We have implemented a series of interfaces which goes as follows 
> DefaultSource -> HoodieDataSourceInternalTable
> HoodieDataSourceInternalTable.newWriteBuilder will return 
> HoodieDataSourceInternalBatchWriteBuilder
> this builder has buildForBatch() which will return BatchWrite.
> BatchWrite is core to our writes.
> {
> constructor: {
>  instantiate dataSourceInternalWriterHelper (DataSourceInternalWriterHelper)
> }   
> DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
>    return new 
> HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
> writeConfig, instantTime, structType, populateMetaFields, 
> arePartitionRecordsSorted);
> }
>  
> public void onDataWriterCommit(WriterCommitMessage message) {
> dataSourceInternalWriterHelper.onDataWriterCommit(message.toString());
> }
>  
> @Override
> public void commit(WriterCommitMessage[] messages) {
> List<WriteStatus> writeStatuses = Arrays.stream(messages).map(m -> 
> (HoodieWriterCommitMessage) m)
> .flatMap(m -> m.getWriteStatuses().stream()).collect(Collectors.toList());
> dataSourceInternalWriterHelper.commit(writeStatuses);
> }
>  
> public void abort(WriterCommitMessage[] messages) {
> dataSourceInternalWriterHelper.abort();
> }
>  
> }
>  
> DataSourceInternalWriterHelper {
> constructor {
>  instantiates WriteClient
> writeClient.initTable
> writeClient.preWrite
> } 
>  
> public void commit(List<WriteStatus> writeStatuses) {
>      writeClient.commitStats(...)
> }
>  
> }
>  
> So, lets work towards unifying the row writer path w/ non-row writer path. 
> This might be required for us to get the re-designed DAG ready for row-writer 
> as well. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to