nsivabalan commented on pull request #4905: URL: https://github.com/apache/hudi/pull/4905#issuecomment-1053612275
I spent time to go over all usages of both configs and here is my understanding. Lets see what was the status prior to merging https://github.com/apache/hudi/pull/4828 for simplicitly, lets talk about one config i.e async enable clustering. for this, we have one config at writer client layer(hoodie.clustering.async.enabled) and one at data source layer (hoodie.datasource.clustering.async.enable). lets call each of them writeClientAsyncConfig and dataSourceAsyncConfig for explanation purpose. DetlaStreamer flow: here, dataSourceAsyncConfig is never in the picture. we check if writeClientAsyncConfig is set and go about it. no confusion here. Spark Datasource flow: We expect users to set datasourceAsyncConfig in this case. And within HoodieSparkSqlWriter, while configuring/instantiating HoodieWriteClientConfig, based on whats set of datasourceAsyncConfig, hudi sets the writeClientAsyncConfig. Excerpt from DataSourceUtils ``` public static HoodieWriteConfig createHoodieConfig(String schemaStr, String basePath, String tblName, Map<String, String> parameters) { . . boolean asyncClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key())); . HoodieWriteConfig.newBuilder().... .withClusteringConfig(HoodieClusteringConfig.newBuilder() .withAsyncClustering(asyncClusteringEnabled).build()) . ``` So, one bug I see here is, if someone sets writeClientAsyncConfig with spark datasource write, probably we may not honor, bcoz, we explicitly check the value of dataSourceAsyncConfig and set the writeClientAsyncConfig in HoodieWriteConfig while building the writeConfig. We have another method in HoodieSparkSqlWriter where we check if asyncClustering is enabled. ``` private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]], parameters: Map[String, String]): Boolean = { log.info(s"Config.asyncClusteringEnabled ? ${client.getConfig.isAsyncClusteringEnabled}") asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled && parameters.get(DataSourceOptions.ASYNC_CLUSTERING_ENABLE.key).exists(r => r.toBoolean) } ``` this is called/used only in HoodieStreaming sink flow. Anyways. Lets see what happens if a) User sets just the datasourceAsyncConfig. writeConfig will get instantiation properly and so client.getConfig.isAsyncClusteringEnabled will be true and parameters.get(DataSourceOptions.ASYNC_CLUSTERING_ENABLE.key) will also be true. and we should be good here. b) User sets just the writeClientConfig. writeConfig instantiation will have a gap here. and so client.getConfig.isAsyncClusteringEnabled will return false. Now, lets revisit whats the status with latest master (i.e. after landing https://github.com/apache/hudi/pull/4828) No change in DeltaStreamer flow. Spark DataSource flow: Excerpt from DataSourceUtils ``` public static HoodieWriteConfig createHoodieConfig(String schemaStr, String basePath, String tblName, Map<String, String> parameters) { . . boolean asyncClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key())); . HoodieWriteConfig.newBuilder().... .withClusteringConfig(HoodieClusteringConfig.newBuilder() .withAsyncClustering(asyncClusteringEnabled).build()) . ``` dataSourceAsyncConfig (DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key()) refers to same config key as writeClientAsyncConfig now. So, essentially the previously used dataSourceAsyncConfig does not exist anymore. and the other method of interest is ``` private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]], parameters: Map[String, String]): Boolean = { log.info(s"Config.asyncClusteringEnabled ? ${client.getConfig.isAsyncClusteringEnabled}") asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled && parameters.get(DataSourceOptions.ASYNC_CLUSTERING_ENABLE.key).exists(r => r.toBoolean) } ``` Now, lets see both options. a) User sets just old dataSourceAsyncConfig. parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key()) while instantiating writeConfig may or may not work. Even though we have added alternate keys, not sure what exactly gets returned as part of DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(). Assuming both keys work, within isAsyncClusteringEnabled: client.getConfig.isAsyncClusteringEnabled will be true, -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org