soumilshah1995 opened a new issue, #8919: URL: https://github.com/apache/hudi/issues/8919
Hello Hope all well while i was playing with clustering i tried few ways to perform clustering using stored procedures. thought to create an issue so it can be tracked ``` try: import sys from pyspark.context import SparkContext from pyspark.sql.session import SparkSession from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions from pyspark.sql.types import * except Exception as e: print("Modules are missing: {}".format(e)) # Get command-line arguments args = getResolvedOptions(sys.argv, ['JOB_NAME']) # Create a Spark session and Glue context spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config('spark.sql.hive.convertMetastoreParquet', 'false') \ .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \ .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \ .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate()) sc = spark.sparkContext glueContext = GlueContext(sc) job = Job(glueContext) logger = glueContext.get_logger() job.init(args['JOB_NAME'], args) db_name = "hudidb" table_name = "customers" path = "s3://soumilshah-hudi-demos/silver/table_name=customers/" query_show_commits = f"call show_commits('{db_name}.{table_name}', 5)" spark_df_commits = spark.sql(query_show_commits) commits = list(map(lambda row: row[0], spark_df_commits.collect())) spark_df_commits.show() try: print("Trying clustering 1..") query_show_clustering = f"call run_clustering('{db_name}.{table_name}')" spark_df_clusterings = spark.sql(query_show_clustering) spark_df_clusterings.show() print(" clustering 1 complete ") except Exception as e: print("Error 1", e) try: print("Try show clustering 2") query = f"call show_clustering('{db_name}.{table_name}')" result_df = spark.sql(query) result_df.show() print("Complete show clustering 2 ") except Exception as e: print("Error show clustering 2", e) try: print("Try show clustering 1 ") query = f"call show_clustering('{path}')" result_df = spark.sql(query) result_df.show() print("Complete show clustering 1 ") except Exception as e: print("Error show clustering 1", e) ``` # Output ``` +-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+ | commit_time|total_bytes_written|total_files_added|total_files_updated|total_partitions_written|total_records_written|total_update_records_written|total_errors| +-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+ |20230609120910005| 13079729| 30| 0| 30| 72| 0| 0| |20230607130458189| 0| 0| 0| 0| 0| 0| 0| |20230607125355320| 18749045| 43| 0| 43| 100| 0| 0| +-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+ +-----------------+----------------+---------+-------------------+ | timestamp|input_group_size| state|involved_partitions| +-----------------+----------------+---------+-------------------+ |20230609121906926| 30|COMPLETED| *| +-----------------+----------------+---------+-------------------+ ``` # Output of show clustering ``` An error occurred while calling o91.sql. : java.util.NoSuchElementException: No value present in Option at org.apache.hudi.common.util.Option.get(Option.java:89) at org.apache.spark.sql.hudi.command.procedures.ShowClusteringProcedure.$anonfun$call$5(ShowClusteringProcedure.scala:79) at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163) at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163) at scala.collection.immutable.Stream.length(Stream.scala:312) at scala.collection.SeqLike.size(SeqLike.scala:108) at scala.collection.SeqLike.size$(SeqLike.scala:108) at scala.collection.AbstractSeq.size(Seq.scala:45) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:341) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339) at scala.collection.AbstractTraversable.toArray(Traversable.scala:108) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:222) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConne ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org