raghavant-git opened a new issue, #8016: URL: https://github.com/apache/hudi/issues/8016
Hello Team, We are using Hudi 0.12.0 via AWS EMR(Hive 3.1.3, Spark 3.3.0). Setup: Source data is from kafka and current hudi table has around 35 million rows partitioned by month and sorted by record key. When we try to enable the inline clustering as part of the ingestion pipeline we seems to get the below error and struggling to find the root cause. Could someone help ? Initial load done using bulk insert with partition sort (clustering disabled) and few incremental loads are done as upserts ( with clustering disabled) and when we we enable the inline clustering as part of the subsequent incremental loads- received the below error. Also, tried enabling the inline clustering when we started with initial bulk load as well - assuming starting the clustering midway might be a problem - but didnt help as well. An error occurred while calling o303.save. : org.apache.hudi.exception.HoodieClusteringException: Clustering failed to write to files:b963cc2e-d0be-4289-a18f-6be70645ab7b-0,61279273-e4d4-451f-beda-b732ca8bbd29-0,464179a4-f296-4196-b2da-2ebcb3fa5fa4-0,f3db0562-edda-48c1-b7c2-5f352756cbae-0,c572602d-42a7-40dc-a97b-f97fccfb6c20-0,a33b231e-2e9d-4dc5-866f-f048c6d226be-0,ab98e0a8-f658-461f-b150-1a2e62a460b8-0,8aa84b9e-ab72-485b-8170-991efaea7578-0,73110ae6-7ea7-4ac6-a132-2c5626ce5ab2-0,a67f1e7e-f573-4556-944f-00a1d986415d-0,42da379d-67ce-4b07-b18c-bf62005cc77b-0,b3ed4dbe-b1b6-414b-8b9f-c256144f9236-0,ac381193-5b88-4cff-9d2f-413245409ff4-0,671c4ecb-2468-405d-b4c5-90fe78fa5c52-0,8c2f6002-2700-4887-a071-b02be470dd60-0,f2cfa1e7-0db4-40dd-87d1-74b44a8aaecf-0,598c8a79-0a5f-46ed-ac03-54d440c01f95-0,99e31b74-352d-4534-8609-9ae161e76b40-0,b8431fac-27af-4989-abd1-e6a00879696c-0,8910e1bb-0de6-4d9a-ace1-df8dd733aedb-0,4c122e2b-28d8-4df4-8df2-ca063f92f838-0,23937c89-bebe-4d07-9f06-51c2ad086b6b-0,2b000a1d-d32a-4ef9-9a0d-522766be40c6-0,fb5f7a1 6-9de1-4d47-90d8-fc3c9e512e72-0,38f6f68a-0ebc-4062-a559-b0f0bcb8318a-0,98cedd2e-112a-4b80-b7d1-5cb1a56b8dbd-0,0acc0c8c-0d71-4e67-acdb-84b0a811dadf-0,fe1984dd-174a-4beb-8868-684598d96261-0,08bec3c7-bfd6-4b41-8a4e-1776697a77b4-0,16b33302-de98-4a51-a4ea-c0621e05bdea-0,aec10f46-8a87-42e8-a77c-479a2f6eaeec-0 at org.apache.hudi.client.SparkRDDWriteClient.completeClustering(SparkRDDWriteClient.java:382) at org.apache.hudi.client.SparkRDDWriteClient.completeTableService(SparkRDDWriteClient.java:474) at org.apache.hudi.client.SparkRDDWriteClient.cluster(SparkRDDWriteClient.java:369) at org.apache.hudi.client.BaseHoodieWriteClient.lambda$inlineClustering$17(BaseHoodieWriteClient.java:1382) at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at org.apache.hudi.client.BaseHoodieWriteClient.inlineClustering(BaseHoodieWriteClient.java:1380) at org.apache.hudi.client.BaseHoodieWriteClient.runTableServicesInline(BaseHoodieWriteClient.java:582) at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:249) at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:126) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:701) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:345) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) **Hudi Options:** self.hudi_options = { "hoodie.table.name": f"{self.table_name}", "hoodie.datasource.write.recordkey.field": f"{self.id}", "hoodie.datasource.write.partitionpath.field": f"{self.partition_field}", "hoodie.datasource.write.precombine.field": f"{self.precombine_field}", "hoodie.datasource.write.table.name": f"{self.table_name}", "hoodie.datasource.hive_sync.database": f"{self.db}", "hoodie.datasource.write.table.name": f"{self.table_name}", "hoodie.datasource.hive_sync.table": f"{self.table_name}", "hoodie.clustering.plan.strategy.sort.columns": f"{self.sort_columns}", "hoodie.metadata.index.bloom.filter.column.list": f"{self.bloom_index_columns}", "hoodie.clustering.inline": f"{self.clustering_inline}", "hoodie.index.type":f"{self.index_type}", "hoodie.datasource.hive_sync.enable": "true", "hoodie.metadata.index.bloom.filter.enable": "true", "hoodie.bloom.index.update.partition.path":"true", "hoodie.metadata.enable": "true", "hoodie.metadata.index.column.stats.enable": "true", "hoodie.clustering.inline.max.commits": 1, 'hoodie.clustering.plan.strategy.small.file.limit':314572800, "hoodie.clustering.plan.strategy.max.bytes.per.group" : 2147483648, "hoodie.memory.merge.max.size" : "25073741824", "hoodie.layout.optimize.enable": "true", "hoodie.layout.optimize.strategy": "hilbert", "hoodie.datasource.write.row.writer.enable": "true", "hoodie.upsert.shuffle.parallelism": 300, "hoodie.insert.shuffle.parallelism": 300, "hoodie.bulkinsert.shuffle.parallelism": 300, "hoodie.bulkinsert.sort.mode" : "PARTITION_SORT", "hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS", "hoodie.cleaner.fileversions.retained": 3, "hoodie.cleaner.parallelism": 200, "hoodie.metadata.index.bloom.filter.parallelism" : 200, "hoodie.datasource.hive_sync.support_timestamp": "true", "hoodie.parquet.max.file.size": "1073741824" } Please let me know if more information is needed , thanks. Appreciate your support!! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org