nsivabalan commented on pull request #4905:
URL: https://github.com/apache/hudi/pull/4905#issuecomment-1053612275


   I spent time to go over all usages of both configs and here is my 
understanding. 
   
   Lets see what was the status prior to merging 
https://github.com/apache/hudi/pull/4828
   
   for simplicitly, lets talk about one config i.e async enable clustering. 
   for this, we have one config at writer client 
layer(hoodie.clustering.async.enabled) and one at data source layer 
(hoodie.datasource.clustering.async.enable). lets call each of them 
writeClientAsyncConfig and dataSourceAsyncConfig for explanation purpose. 
   
   DetlaStreamer flow:
   here, dataSourceAsyncConfig is never in the picture. we check if 
writeClientAsyncConfig is set and go about it. no confusion here.
   
   Spark Datasource flow: 
   We expect users to set datasourceAsyncConfig in this case. 
   And within HoodieSparkSqlWriter, while configuring/instantiating 
HoodieWriteClientConfig, based on whats set of datasourceAsyncConfig, hudi sets 
the writeClientAsyncConfig. 
   
   Excerpt from DataSourceUtils 
   ```
   
   public static HoodieWriteConfig createHoodieConfig(String schemaStr, String 
basePath,
         String tblName, Map<String, String> parameters) {
   .
   .
       boolean asyncClusteringEnabled = 
Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key()));
   .
   HoodieWriteConfig.newBuilder().... 
   .withClusteringConfig(HoodieClusteringConfig.newBuilder()
               .withAsyncClustering(asyncClusteringEnabled).build())
   .
   ```
   
   So, one bug I see here is, if someone sets writeClientAsyncConfig with spark 
datasource write, probably we may not honor, bcoz, we explicitly check the 
value of dataSourceAsyncConfig and set the writeClientAsyncConfig in 
HoodieWriteConfig while building the writeConfig.
   
   We have another method in HoodieSparkSqlWriter where we check if 
asyncClustering is enabled. 
   ```
     private def isAsyncClusteringEnabled(client: 
SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
                                          parameters: Map[String, String]): 
Boolean = {
       log.info(s"Config.asyncClusteringEnabled ? 
${client.getConfig.isAsyncClusteringEnabled}")
       asyncClusteringTriggerFnDefined && 
client.getConfig.isAsyncClusteringEnabled &&
         parameters.get(DataSourceOptions.ASYNC_CLUSTERING_ENABLE.key).exists(r 
=> r.toBoolean)
     }
   ```
   
   this is called/used only in HoodieStreaming sink flow. Anyways. 
   Lets see what happens if 
   a) User sets just the datasourceAsyncConfig.
   writeConfig will get instantiation properly and so 
client.getConfig.isAsyncClusteringEnabled will be true and 
parameters.get(DataSourceOptions.ASYNC_CLUSTERING_ENABLE.key) will also be true.
   and we should be good here. 
   b) User sets just the writeClientConfig. 
   writeConfig instantiation will have a gap here. and so 
client.getConfig.isAsyncClusteringEnabled will return false. 
   
   
   Now, lets revisit whats the status with latest master (i.e. after landing 
https://github.com/apache/hudi/pull/4828) 
   
   No change in DeltaStreamer flow.
   
   Spark DataSource flow: 
   Excerpt from DataSourceUtils 
   ```
   
   public static HoodieWriteConfig createHoodieConfig(String schemaStr, String 
basePath,
         String tblName, Map<String, String> parameters) {
   .
   .
       boolean asyncClusteringEnabled = 
Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key()));
   .
   HoodieWriteConfig.newBuilder().... 
   .withClusteringConfig(HoodieClusteringConfig.newBuilder()
               .withAsyncClustering(asyncClusteringEnabled).build())
   .
   ```
   dataSourceAsyncConfig 
(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key()) refers to same config 
key as writeClientAsyncConfig now. So, essentially the previously used 
dataSourceAsyncConfig does not exist anymore. 
   
   and the other method of interest is 
   ```
     private def isAsyncClusteringEnabled(client: 
SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
                                          parameters: Map[String, String]): 
Boolean = {
       log.info(s"Config.asyncClusteringEnabled ? 
${client.getConfig.isAsyncClusteringEnabled}")
       asyncClusteringTriggerFnDefined && 
client.getConfig.isAsyncClusteringEnabled &&
         parameters.get(DataSourceOptions.ASYNC_CLUSTERING_ENABLE.key).exists(r 
=> r.toBoolean)
     }
   ```
   
   Now, lets see both options.
   a) User sets just old dataSourceAsyncConfig. 
   parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key()) while 
instantiating writeConfig may or may not work. Even though we have added 
alternate keys, not sure what exactly gets returned as part of 
DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(). 
   Assuming both keys work, within isAsyncClusteringEnabled: 
client.getConfig.isAsyncClusteringEnabled will be true, 
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to