[GitHub] [hudi] nsivabalan commented on a diff in pull request #7632: [HUDI-3775] Allow for offline compaction of MOR tables via spark streaming

2023-04-27 Thread via GitHub


nsivabalan commented on code in PR #7632:
URL: https://github.com/apache/hudi/pull/7632#discussion_r1179510832


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##
@@ -483,6 +483,15 @@ object DataSourceWriteOptions {
   + "This could introduce the potential issue that the job is 
restart(`batch id` is lost) while spark checkpoint write fails, "
   + "causing spark will retry and rewrite the data.")
 
+  val STREAMING_DISABLE_COMPACTION: ConfigProperty[String] = ConfigProperty
+.key("hoodie.datasource.write.streaming.disable.compaction")
+.defaultValue("false")
+.sinceVersion("0.13.0")

Review Comment:
   lets fix the version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a diff in pull request #7632: [HUDI-3775] Allow for offline compaction of MOR tables via spark streaming

2023-01-17 Thread GitBox


nsivabalan commented on code in PR #7632:
URL: https://github.com/apache/hudi/pull/7632#discussion_r1072765472


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##
@@ -455,6 +455,15 @@ object DataSourceWriteOptions {
   + "This could introduce the potential issue that the job is 
restart(`batch id` is lost) while spark checkpoint write fails, "
   + "causing spark will retry and rewrite the data.")
 
+  val STREAMING_DISABLE_COMPACTION: ConfigProperty[String] = ConfigProperty

Review Comment:
   inline compaction does not makes sense for streaming ingestion.So, only 
option users have is to leverage async compaction in a separate thread or 
completely disable compaction w/ ingestion process and take up async compaction 
by a separate process altogether. 
   
   So, given this, not sure how we can deduce this. bcoz, default value for 
"hoodie.compact.inline" is false which means is async. Can you help me 
understand. def interested to see if we can avoid the new config. we also tried 
to follow what deltastreamer does towards this to introduce a top level config. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a diff in pull request #7632: [HUDI-3775] Allow for offline compaction of MOR tables via spark streaming

2023-01-10 Thread GitBox


nsivabalan commented on code in PR #7632:
URL: https://github.com/apache/hudi/pull/7632#discussion_r1066307179


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java:
##
@@ -344,6 +351,11 @@ public Builder withInlineLogCompaction(Boolean 
inlineLogCompaction) {
   return this;
 }
 
+public Builder withOfflineCompaction(Boolean useOfflineCompaction) {

Review Comment:
   lets try to align the method naming w/ the var name. 
   



##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java:
##
@@ -67,6 +67,13 @@ public class HoodieCompactionConfig extends HoodieConfig {
   .withDocumentation("When set to true, logcompaction service is triggered 
after each write. While being "
   + " simpler operationally, this adds extra latency on the write 
path.");
 
+  public static final ConfigProperty 
DISABLE_ASYNC_COMPACT_FOR_SPARK_STREAMING = ConfigProperty

Review Comment:
   we have other streaming configs in DataSourceOptions. 
   for eg
   ```
   
 val STREAMING_RETRY_CNT: ConfigProperty[String] = ConfigProperty
   .key("hoodie.datasource.write.streaming.retry.count")
   .defaultValue("3")
   .withDocumentation("Config to indicate how many times streaming job 
should retry for a failed micro batch.")
   
 val STREAMING_RETRY_INTERVAL_MS: ConfigProperty[String] = ConfigProperty
   .key("hoodie.datasource.write.streaming.retry.interval.ms")
   .defaultValue("2000")
   .withDocumentation(" Config to indicate how long (by millisecond) before 
a retry should issued for failed microbatch")
   
   ```
   
   So, lets move it there and honor the prefix 
"hoodie.datasource.write.streaming.disable.compaction"
   



##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java:
##
@@ -67,6 +67,13 @@ public class HoodieCompactionConfig extends HoodieConfig {
   .withDocumentation("When set to true, logcompaction service is triggered 
after each write. While being "
   + " simpler operationally, this adds extra latency on the write 
path.");
 
+  public static final ConfigProperty 
DISABLE_ASYNC_COMPACT_FOR_SPARK_STREAMING = ConfigProperty
+  .key("hoodie.compact.disable.for.spark.streaming")
+  .defaultValue("false")
+  .withDocumentation("When set to true, compaction will not run inline nor 
asynchronously. Compaction can take up significant resources "

Review Comment:
   some word smithing
   ```
   By default for MOR table, async compaction is enabled with spark streaming 
sink. By setting this config to true, we can disable it and the expectation is 
that, users will schedule and execute compaction in a different process/job 
altogether. Some users may wish to run it separately to manage resources across 
table services and regular ingestion pipeline and so this could be preferred on 
such cases."
   ```



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##
@@ -117,7 +119,8 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 retry(retryCnt, retryIntervalMs)(
   Try(
 HoodieSparkSqlWriter.write(
-  sqlContext, mode, updatedOptions, data, hoodieTableConfig, 
writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering),
+  sqlContext, mode, updatedOptions, data, hoodieTableConfig, 
writeClient,
+  if (disableCompaction) None else Some(triggerAsyncCompactor), 
Some(triggerAsyncClustering),

Review Comment:
   Is it possible to write tests in TestStructuredStreaming to test the 
functionality? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org