Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 closed issue #10110: [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta URL: https://github.com/apache/hudi/issues/10110 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2276657019 spoke to Sudha she gave me some nice feedback I will try those items -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2275701103 ``` from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType from pyspark.sql.functions import hour, col from datetime import datetime, timedelta import os import sys import random # Configuration HUDI_VERSION = '1.0.0-beta2' SPARK_VERSION = '3.4' os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11" SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ['PYSPARK_PYTHON'] = sys.executable # Initialize Spark session spark = SparkSession.builder \ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \ .config('spark.sql.hive.convertMetastoreParquet', 'false') \ .getOrCreate() # Generate mock event data def generate_event_data(num_events): event_types = ["click", "view", "purchase", "signup"] start_time = datetime(2023, 1, 1) data = [] for i in range(num_events): event = { "event_id": i + 1, "user_id": random.randint(1, 100), "event_type": random.choice(event_types), "timestamp": (start_time + timedelta(hours=random.randint(0, 5000))).strftime("%Y-%m-%d %H:%M:%S") } data.append(event) return data # Create DataFrame num_events = 1 events_data = generate_event_data(num_events) df = spark.createDataFrame(events_data) df.show() # Write DataFrame to Hudi table table_name = "web_events" path = f'file:///Users/soumilshah/Desktop/{table_name}/' df.write.format("hudi") \ .option("hoodie.table.name", table_name) \ .option("hoodie.datasource.write.recordkey.field", "event_id") \ .option("hoodie.datasource.write.partitionpath.field", "") \ .option("hoodie.datasource.write.precombine.field", "timestamp") \ .option("hoodie.table.metadata.enable", "true") \ .option("hoodie.metadata.index.column.stats.enable", "true") \ .option("path", path) \ .mode("overwrite") \ .saveAsTable(table_name) # Create functional index on timestamp column query_create_ts_datestr = """ CREATE INDEX IF NOT EXISTS ts_datestr ON web_events USING column_stats(timestamp) OPTIONS(func='from_unixtime', format='-MM-dd') """ spark.sql(query_create_ts_datestr).show() # Query data for a specific date spark.sql(""" SELECT event_type, user_id, event_id FROM web_events WHERE date_format(timestamp, '-MM-dd') = '2023-06-17' """).show() # Explain query plan for date-based query spark.sql(""" EXPLAIN SELECT event_type, user_id, event_id FROM web_events WHERE date_format(timestamp, '-MM-dd') = '2023-06-17' """).show(truncate=False) # Create functional index on hour of timestamp query_create_ts_hour = """ CREATE INDEX ts_hour ON web_events USING column_stats(timestamp) OPTIONS(func='hour') """ spark.sql(query_create_ts_hour) # Query data aggregated by hour spark.sql(""" SELECT hour(timestamp) AS hour_of_day, COUNT(*) AS event_count FROM web_events GROUP BY hour(timestamp) """).show() ``` Is this a good example to observe a reduction in the number of files read before and after creating a functional index? If not, what changes should I make to the dataset or my query method to clearly see a decrease in the number of files read? I look forward to your guidance on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
codope commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2275090149 Hi @soumilshah1995 , there are two things you can look at. First when you do EXPLAIN ANALYZE, the plan should show that it is using `HoodieFileIndex`. Second, the "number of files read" in Spark UI should show lesser number of files if any files were skipped using functional index (or any index for that matter). Another way is to enable debug logs for org.apache.hudi package when you launch spark-sql, and then upon executing the query, you will see something called "skipping ratio" which tells you the percentage of files skipped. Note that if the files are already pruned due to parition pruning, and then all of those pruned files need to be scanned as per the query predicate, then skipping ratio can be 0. Only when there is additional file pruning on top of partition pruning, you will find that skipping ratio is positive. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2252804125 agar, could you guide me on which metrics I should focus on? Specifically, should I be looking at query execution time or the total amount of data scanned? What are the key metrics I should pay attention to? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2250710861 just updating this thread I did small test # Before Index Creation ``` spark.read.format("hudi") \ .option("hoodie.enable.data.skipping", "true") \ .option("hoodie.metadata.enable", "true") \ .option("hoodie.metadata.index.column.stats.enable", "true") \ .load(path) \ .createOrReplaceTempView("snapshots") spark.sql(""" SELECT * FROM snapshots """).printSchema() result = spark.sql(""" SELECT event_type, user_id, event_id FROM snapshots WHERE date_format(timestamp, '-MM-dd') = '2023-06-17' """) result.show() result.explain(True) ``` ![image](https://github.com/user-attachments/assets/d6af8bcb-e960-4c69-8a7e-5e84f3c64745) ## After creating Index ``` query_create_ts_datestr = f""" CREATE INDEX IF NOT EXISTS ts_datestr ON web_events USING column_stats(timestamp) OPTIONS(func='from_unixtime', format='-MM-dd') """ result = spark.sql(query_create_ts_datestr) result = spark.sql(""" SELECT event_type, user_id, event_id FROM web_events WHERE date_format(timestamp, '-MM-dd') = '2023-06-17' """) result.show() result.explain(True) ``` ![image](https://github.com/user-attachments/assets/16777f4b-7fb0-47e6-9683-fb579f85febd) I do see difference in query time its faster what else I should see to ensure this is working as expected ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2247963537 # Code ``` from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType from pyspark.sql.functions import hour, col from datetime import datetime, timedelta import os import sys import random # Configuration HUDI_VERSION = '1.0.0-beta2' SPARK_VERSION = '3.4' os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11" SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ['PYSPARK_PYTHON'] = sys.executable # Initialize Spark session spark = SparkSession.builder \ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \ .config('spark.sql.hive.convertMetastoreParquet', 'false') \ .getOrCreate() # Generate mock event data def generate_event_data(num_events): event_types = ["click", "view", "purchase", "signup"] start_time = datetime(2023, 1, 1) data = [] for i in range(num_events): event = { "event_id": i + 1, "user_id": random.randint(1, 100), "event_type": random.choice(event_types), "timestamp": (start_time + timedelta(hours=random.randint(0, 5000))).strftime("%Y-%m-%d %H:%M:%S") } data.append(event) return data # Create DataFrame num_events = 1 events_data = generate_event_data(num_events) df = spark.createDataFrame(events_data) df.show() # Write DataFrame to Hudi table table_name = "web_events" path = f'file:///Users/soumilshah/Desktop/{table_name}/' df.write.format("hudi") \ .option("hoodie.table.name", table_name) \ .option("hoodie.datasource.write.recordkey.field", "event_id") \ .option("hoodie.datasource.write.partitionpath.field", "") \ .option("hoodie.datasource.write.precombine.field", "timestamp") \ .option("hoodie.table.metadata.enable", "true") \ .option("hoodie.metadata.index.column.stats.enable", "true") \ .option("path", path) \ .mode("overwrite") \ .saveAsTable(table_name) # Create functional index on timestamp column query_create_ts_datestr = """ CREATE INDEX IF NOT EXISTS ts_datestr ON web_events USING column_stats(timestamp) OPTIONS(func='from_unixtime', format='-MM-dd') """ spark.sql(query_create_ts_datestr).show() # Query data for a specific date spark.sql(""" SELECT event_type, user_id, event_id FROM web_events WHERE date_format(timestamp, '-MM-dd') = '2023-06-17' """).show() # Explain query plan for date-based query spark.sql(""" EXPLAIN SELECT event_type, user_id, event_id FROM web_events WHERE date_format(timestamp, '-MM-dd') = '2023-06-17' """).show(truncate=False) # Create functional index on hour of timestamp query_create_ts_hour = """ CREATE INDEX ts_hour ON web_events USING column_stats(timestamp) OPTIONS(func='hour') """ spark.sql(query_create_ts_hour) # Query data aggregated by hour spark.sql(""" SELECT hour(timestamp) AS hour_of_day, COUNT(*) AS event_count FROM web_events GROUP BY hour(timestamp) """).show() ``` Questions How do I verify that the query is using the functional index? ``` spark.sql(""" EXPLAIN SELECT event_type, user_id, event_id FROM web_events WHERE date_format(timestamp, '-MM-dd') = '2023-06-17' """).show(truncate=False) +---+ |plan
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2026022754 Vinay, the problem I'm facing is that I struggle with building the JAR myself due to java versions conflicts . That's why I've resorted to using the packaged version to incorporate Hudi 1.0.0-beta. I recall Sagar mentioning that there were some issues with it. He did assure me that they were resolved in the master branch, and he also hinted that the official release would be coming soon. Given these remarks, I'm inclined to think that this version still has some bugs I've attached the chats for your reference. https://github.com/apache/hudi/assets/39345855/8c84a852-38db-40ee-9c99-4091dd81bb7c";> Regarding our options: Option A) I could wait for the beta 2 release. Option B) If it's possible, could you send me the Jar files for Spark 3.4? That way, I can give it a try. I'm fine with waiting, but if you can provide the Jar files, that works too. Let me know what works best for you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
bhat-vinay commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2025074505 Thanks for trying all the suggestions. I am unable to reproduce this in my environment. I will spend some time next week to setup the exact environment/versions you are using. In the mean time, would it be possible for you to run your tests against the latest master branch of Hudi and reproduce this? From the stack-trace below, I am not sure why the index build process is stripping the full path of the data file in your environment. ``` 24/03/28 07:36:06 WARN ScheduleIndexActionExecutor: Following partitions already exist or inflight: [files]. Going to schedule indexing of only these partitions: [func_index_] 24/03/28 07:36:06 ERROR HoodieBackedTableMetadataWriter: Bootstrap on func_index_ partition failed for file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/2c1b72c3-5de3-4f19-b7dc-e67b3a345c53-0_5-25-0_20240328073553477.parquet. at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1419) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2024977432 Hi I am still seeing same issue even after removing file:/// # Error ``` 24/03/28 07:36:06 WARN ScheduleIndexActionExecutor: Following partitions already exist or inflight: [files]. Going to schedule indexing of only these partitions: [func_index_] 24/03/28 07:36:06 ERROR HoodieBackedTableMetadataWriter: Bootstrap on func_index_ partition failed for file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/2c1b72c3-5de3-4f19-b7dc-e67b3a345c53-0_5-25-0_20240328073553477.parquet. at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1419) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754) at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) --- Py4JJavaError Traceback (most recent call last) Cell In[4], line 8 1 query = """ 2 CREATE INDEX hudi_table_func_index_datestr 3 ON table_name 4 USING column_stats(ts) 5 OPTIONS(func='from_unixtime', format='-MM-dd') 6 """ > 8 spark.sql(query) File /opt/homebrew/lib/python3.11/site-packages/pyspark/sql/session.py:1440, in SparkSession.sql(self, sqlQuery, args, **kwargs) 1438 try: 1439 litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()} -> 1440 return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self) 1441 finally: 1442 if len(kwargs) > 0: File /opt/homebrew/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File /opt/homebrew/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception..deco(*a, **kw) 167 def deco(*a: Any, **kw: Any) -> Any: 168 try: --> 169 return f(*a, **kw) 170 except Py4JJavaError as e: 171 converted = convert_exception(e.java_exception) File /opt/homebrew/lib/python3.11/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value)) Py4JJavaError: An error occurred while calling o34.sql. : org.apache.hudi.exception.HoodieMetadataException: Failed to index partition [func_index_hudi_table_func_index_datestr] at org.apache.hudi.table.action.index.RunIndexActionExecutor.execute(RunIndexActionExecutor.java:179) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.index(HoodieSparkCopyOnWriteTable.java:308) at org.apache.hudi.client.BaseHoodieWriteClient.index(BaseHoodi
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
bhat-vinay commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2024338214 ``` 24/03/27 14:14:10 ERROR HoodieBackedTableMetadataWriter: Bootstrap on func_index_ partition failed for file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/f113c72e-9bf6-4ab1-84b4-b2e9467142a8-0_11-68-0_20240327141353398.parquet. ``` The above lines indicate that the basepath for the file was created in "file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata", but the the functional-index build process is trying to read a data file from "file:/f113c72e-9bf6-4ab1-84b4-b2e9467142a8-0_11-68-0_20240327141353398.parquet". Please try removing the 'file:///' prefix from the path and retry i.e path should be `/Users/soumilshah/Desktop/hudidemo/` If you still see the problem, please open a new issue. As I mentioned earlier, the issue you are seeing now is not the same as the original issue that you reported (trying to build index on a temporary view). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2023493106 Here is Full code ``` from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType from datetime import datetime import os import sys from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType from datetime import datetime import os import sys HUDI_VERSION = '1.0.0-beta1' SPARK_VERSION = '3.4' os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11" SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ['PYSPARK_PYTHON'] = sys.executable # Spark session spark = SparkSession.builder \ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \ .config('className', 'org.apache.hudi') \ .config('spark.sql.hive.convertMetastoreParquet', 'false') \ .getOrCreate() data = [ [1695159649, '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 'driver-K', 19.10, 'san_francisco'], [1695159649, 'e96c4396-3fad-413a-a942-4cb36106d721', 'rider-C', 'driver-M', 27.70, 'san_francisco'], ] # Define schema for the DataFrame schema = StructType([ StructField("ts", StringType(), True), StructField("transaction_id", StringType(), True), StructField("rider", StringType(), True), StructField("driver", StringType(), True), StructField("price", FloatType(), True), StructField("location", StringType(), True), ]) # Create Spark DataFrame df = spark.createDataFrame(data, schema=schema) df.show() path = 'file:///Users/soumilshah/Desktop/hudidemo/' hudi_options = { 'hoodie.table.name': 'hudi_table_func_index', 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.recordkey.field': 'transaction_id', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.table.metadata.enable': 'true', 'hoodie.datasource.write.partitionpath.field': 'location', 'hoodie.parquet.small.file.limit':'0' } df.write.format("hudi").options(**hudi_options).option("path", path).mode("append").saveAsTable("table_name") spark.read.format("hudi").load(path) TABLE_NAME = "table_name" spark.sql(f"""SELECT from_unixtime(ts, '-MM-dd') as datestr FROM {TABLE_NAME}""").show() """ +--+ | datestr| +--+ |2023-09-19| |2023-09-19| +--+ """ query = """ CREATE INDEX hudi_table_func_index_datestr ON table_name USING column_stats(ts) OPTIONS(func='from_unixtime', format='-MM-dd') """ spark.sql(query) ``` # Error ``` 24/03/27 14:14:10 WARN ScheduleIndexActionExecutor: Following partitions already exist or inflight: [files]. Going to schedule indexing of only these partitions: [func_index_] 24/03/27 14:14:10 ERROR HoodieBackedTableMetadataWriter: Bootstrap on func_index_ partition failed for file:/Users/soumilshah/Desktop/hudidemo/.hoodie/metadata org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/f113c72e-9bf6-4ab1-84b4-b2e9467142a8-0_11-68-0_20240327141353398.parquet. at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1419) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754) at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2023468031 I did give a path ``` path = 'file:///Users/soumilshah/IdeaProjects/SparkProject/DeltaStreamer/hudi/' ``` do you want me to change the path and try ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
bhat-vinay commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2022668751 Thanks for trying it out. This seems like a different issue (rather than the spark analysis error that you were seeing earlier). The error reported now is that the base files for the table are not found (in the specified location/path) i.e "file:/072fb7a1-bbd2-481a-9c7a-3ab3e3a70e68-0_11-31-0_2024032708863.parquet" is not being found to build the index. I think you should give the correct `path` for the external table. Something like `df.write.format("hudi").options(**hudi_options).option("path","/full/path/to/external/table/location/").mode("append").saveAsTable("table_name")` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2022643410 Still getting error here is full code and error message please let me know what I am missing here # Code ``` from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType from datetime import datetime import os import sys from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType from datetime import datetime import os import sys HUDI_VERSION = '1.0.0-beta1' SPARK_VERSION = '3.4' os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11" SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ['PYSPARK_PYTHON'] = sys.executable # Spark session spark = SparkSession.builder \ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \ .config('className', 'org.apache.hudi') \ .config('spark.sql.hive.convertMetastoreParquet', 'false') \ .getOrCreate() data = [ [1695159649, '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 'driver-K', 19.10, 'san_francisco'], [1695159649, 'e96c4396-3fad-413a-a942-4cb36106d721', 'rider-C', 'driver-M', 27.70, 'san_francisco'], ] # Define schema for the DataFrame schema = StructType([ StructField("ts", StringType(), True), StructField("transaction_id", StringType(), True), StructField("rider", StringType(), True), StructField("driver", StringType(), True), StructField("price", FloatType(), True), StructField("location", StringType(), True), ]) # Create Spark DataFrame df = spark.createDataFrame(data, schema=schema) df.show() path = 'file:///Users/soumilshah/IdeaProjects/SparkProject/DeltaStreamer/hudi/' hudi_options = { 'hoodie.table.name': 'hudi_table_func_index', 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.recordkey.field': 'transaction_id', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.table.metadata.enable': 'true', 'hoodie.datasource.write.partitionpath.field': 'location', 'hoodie.parquet.small.file.limit':'0' } df.write.format("hudi").options(**hudi_options).option("path", path).mode("append").saveAsTable("table_name") query = """ CREATE INDEX hudi_table_func_index_datestr ON table_name USING column_stats(ts) OPTIONS(func='from_unixtime', format='-MM-dd') """ spark.sql(query) ``` ## Error ``` 24/03/27 08:26:00 WARN ScheduleIndexActionExecutor: Following partitions already exist or inflight: [files]. Going to schedule indexing of only these partitions: [func_index_] 24/03/27 08:26:00 ERROR HoodieBackedTableMetadataWriter: Bootstrap on func_index_ partition failed for file:/Users/soumilshah/IdeaProjects/SparkProject/DeltaStreamer/hudi/.hoodie/metadata org.apache.spark.sql.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/072fb7a1-bbd2-481a-9c7a-3ab3e3a70e68-0_11-31-0_2024032708863.parquet. at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1419) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754) at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.Fork
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2022590616 let me go ahead and try this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
bhat-vinay commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-2020640947 The issue is that `spark.read.format("hudi").load(PATH).createOrReplaceTempView(TABLE_NAME)` creates a temporary view (similar to the one that is created using `CREATE TEMPorary VIEW ...`) and it is neither a table and not a hudi managed table. Hence the following `CREATE INDEX ...` statement to create a functional fails as the object on which the index is being created is not a hudi managed table. Instead of creating a temporary view, one can use `saveAsTable(...)` method on the DataFrameWriter object to create a hudi managed table and then create functional index on those tables. Please see if this works for you: ``` df.write.format("hudi").options(**hudi_options).option("path", "/external/table/path").mode("append").saveAsTable("table_name") spark.sql(s"CREATE INDEX hudi_table_func_index_datestr ON table_name USING column_stats(ts) options(func='from_unixtime', format='-MM-dd')") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-1935079406 is this release out ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-1893758507 Great -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
zyclove commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-1891184654 @danny0405 @codope When will version 1.0.0 be released? After the beta version, it has not been updated for a long time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-1817313322 Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
ad1happy2go commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-1815852059 @soumilshah1995 @codope Create JIRA to track this issue - https://issues.apache.org/jira/browse/HUDI-7117 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-1815424659 # Code ``` from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType from datetime import datetime import os import sys from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType from datetime import datetime import os import sys HUDI_VERSION = '1.0.0-beta1' SPARK_VERSION = '3.4' SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ['PYSPARK_PYTHON'] = sys.executable # Spark session spark = SparkSession.builder \ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \ .config('className', 'org.apache.hudi') \ .config('spark.sql.hive.convertMetastoreParquet', 'false') \ .getOrCreate() data = [ [1695159649, '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 'driver-K', 19.10, 'san_francisco'], [1695159649, 'e96c4396-3fad-413a-a942-4cb36106d721', 'rider-C', 'driver-M', 27.70, 'san_francisco'], ] # Define schema for the DataFrame schema = StructType([ StructField("ts", StringType(), True), StructField("transaction_id", StringType(), True), StructField("rider", StringType(), True), StructField("driver", StringType(), True), StructField("price", FloatType(), True), StructField("location", StringType(), True), ]) # Create Spark DataFrame df = spark.createDataFrame(data, schema=schema) df.show() path = 'file:///Users/soumilnitinshah/Downloads/hudidb/hudi_table_func_index' hudi_options = { 'hoodie.table.name': 'hudi_table_func_index', 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.recordkey.field': 'transaction_id', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.table.metadata.enable': 'true', 'hoodie.datasource.write.partitionpath.field': 'location', 'hoodie.parquet.small.file.limit':'0' } df.write.format("hudi").options(**hudi_options).mode("append").save(path) PATH = 'file:///Users/soumilnitinshah/Downloads/hudidb/hudi_table_func_index' TABLE_NAME = "hudi_table_func_index" spark.read.format("hudi").load(PATH).createOrReplaceTempView(TABLE_NAME) spark.sql(f"""SELECT from_unixtime(ts, '-MM-dd') as datestr FROM {TABLE_NAME}""").show() spark.sql(f"""CREATE INDEX {TABLE_NAME}_datestr ON {TABLE_NAME} USING column_stats(ts) options(func='from_unixtime', format='-MM-dd')""") ``` # Error ``` +--+ | datestr| +--+ |2023-09-19| |2023-09-19| +--+ --- Py4JJavaError Traceback (most recent call last) Cell In[7], line 7 4 spark.read.format("hudi").load(PATH).createOrReplaceTempView(TABLE_NAME) 6 spark.sql(f"""SELECT from_unixtime(ts, '-MM-dd') as datestr FROM {TABLE_NAME}""").show() > 7 spark.sql(f"""CREATE INDEX {TABLE_NAME}_datestr ON {TABLE_NAME} USING column_stats(ts) options(func='from_unixtime', format='-MM-dd')""") File ~/anaconda3/lib/python3.11/site-packages/pyspark/sql/session.py:1440, in SparkSession.sql(self, sqlQuery, args, **kwargs) 1438 try: 1439 litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()} -> 1440 return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self) 1441 finally: 1442 if len(kwargs) > 0: File ~/anaconda3/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File ~/anaconda3/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception..deco(*a, **kw) 167 def deco(*a: Any, **kw: Any) -> Any: 168 try: --> 169 return f(*a, **kw) 170 except Py4JJavaError as e: 171 converted = convert_exception(e.java_exception) File ~/anaconda3
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
ad1happy2go commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-1814968575 @soumilshah1995 There is a bit of issue with your query I guess. I tried below code for which the parsing works. Although facing some other problem. `No plan for CreateIndex hudi_func_index_01a19d91_699f_4ead_afe1_82a232f91efa_datestr, column_stats, false,` Can you try once. ``` # Sample data data = [ [1695159649, '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 'driver-K', 19.10, 'san_francisco'], [1695159649, 'e96c4396-3fad-413a-a942-4cb36106d721', 'rider-C', 'driver-M', 27.70, 'san_francisco'], ] # Define schema for the DataFrame schema = StructType([ StructField("ts", LongType(), True), StructField("transaction_id", StringType(), True), StructField("rider", StringType(), True), StructField("driver", StringType(), True), StructField("price", FloatType(), True), StructField("location", StringType(), True), ]) # Create Spark DataFrame df = spark.createDataFrame(data, schema=schema) df.show() hudi_options = { 'hoodie.table.name': TABLE_NAME, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.recordkey.field': 'transaction_id', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.datasource.write.partitionpath.field': 'location', 'hoodie.parquet.small.file.limit':'0' } df.write.format("hudi").options(**hudi_options).mode("append").save(PATH) spark.read.format("hudi").load(PATH).createOrReplaceTempView(TABLE_NAME) spark.sql(f"""SELECT from_unixtime(ts, '-MM-dd') as datestr FROM {TABLE_NAME}""").show() spark.sql(f"""CREATE INDEX {TABLE_NAME}_datestr ON {TABLE_NAME} USING column_stats(ts) options(func='from_unixtime', format='-MM-dd')""") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-1814482504 Here is the entirety of my code, and I'm encountering persistent issues. I attempted to contact you through a personal message on Slack for more rapid communication, hoping to expedite the resolution process. Please inform me if I've made any errors, as I'm still in the early stages of learning and might be overlooking certain aspects. I'm eager to learn, progress, and eventually contribute my knowledge to the Hudi community. ``` from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType from datetime import datetime import os import sys HUDI_VERSION = '1.0.0-beta1' SPARK_VERSION = '3.4' SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ['PYSPARK_PYTHON'] = sys.executable # Spark session spark = SparkSession.builder \ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \ .config('className', 'org.apache.hudi') \ .config('spark.sql.hive.convertMetastoreParquet', 'false') \ .getOrCreate() # Sample data data = [ ['2023-09-20 03:58:59', '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 'driver-K', 19.10, 'san_francisco'], ['2023-09-19 08:46:34', 'e96c4396-3fad-413a-a942-4cb36106d721', 'rider-C', 'driver-M', 27.70, 'san_francisco'], ] # Define schema for the DataFrame schema = StructType([ StructField("ts", StringType(), True), StructField("transaction_id", StringType(), True), StructField("rider", StringType(), True), StructField("driver", StringType(), True), StructField("price", FloatType(), True), StructField("location", StringType(), True), ]) # Create Spark DataFrame df = spark.createDataFrame(data, schema=schema) df.show() path = 'file:///Users/soumilnitinshah/Downloads/hudidb/hudi_table_func_index' hudi_options = { 'hoodie.table.name': 'hudi_table_func_index', 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.recordkey.field': 'transaction_id', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.table.metadata.enable': 'true', 'hoodie.datasource.write.partitionpath.field': 'location' } df.write.format("hudi").options(**hudi_options).mode("append").save(path) # Register the Hudi table spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_table_func_index") # Create the functional index functional_index_sql = """ CREATE INDEX ts_functional_index ON hudi_table_func_index(ts) USING column_stats OPTIONS (func='from_unixtime', format='-MM-dd HH:mm:ss'); """ spark.sql(functional_index_sql) ``` # Error ``` --- ParseExceptionTraceback (most recent call last) Cell In[2], line 12 4 # Create the functional index 5 functional_index_sql = """ 6 CREATE INDEX ts_functional_index 7 ON hudi_table_func_index(ts) 8 USING column_stats 9 OPTIONS (func='from_unixtime', format='-MM-dd HH:mm:ss'); 10 """ ---> 12 spark.sql(functional_index_sql) File ~/anaconda3/lib/python3.11/site-packages/pyspark/sql/session.py:1440, in SparkSession.sql(self, sqlQuery, args, **kwargs) 1438 try: 1439 litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()} -> 1440 return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self) 1441 finally: 1442 if len(kwargs) > 0: File ~/anaconda3/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File ~/anaconda3/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception..deco(*a, **kw) 171 converted = convert_exception(e.java_exception) 172 if not isinstance(converted, UnknownException): 173 # Hide where the exception came from that shows
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
codope commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-1814463898 It should work with 1.0.0-beta1 with spark-sql. In the DDL page, I have provided an example with `hour` function. You can replace it with `from_unixtime`. This is what I tried and it works. ``` DROP TABLE IF EXISTS hudi_table; CREATE TABLE hudi_table ( ts BIGINT, uuid STRING, rider STRING, driver STRING, fare DOUBLE, city STRING ) USING HUDI tblproperties (primaryKey = 'uuid') PARTITIONED BY (city) location 'file:///tmp/hudi_func_index'; -- disable small file handling so the each insert creates new file -- set hoodie.parquet.small.file.limit=0; -- records with ts in [2023-09-18, 2023-09-24] -- INSERT INTO hudi_table VALUES (1695159649,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'); INSERT INTO hudi_table VALUES (1695091554,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'); INSERT INTO hudi_table VALUES (1695046462,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'); INSERT INTO hudi_table VALUES (1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'); INSERT INTO hudi_table VALUES (1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'); INSERT INTO hudi_table VALUES (1695376420,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'); INSERT INTO hudi_table VALUES (1695173887,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'); INSERT INTO hudi_table VALUES (1695115999,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai'); SELECT city, fare, rider, driver, from_unixtime(ts, '-MM-dd') as datestr FROM hudi_table; san_francisco33.9rider-D driver-L2023-09-18 san_francisco27.7rider-C driver-M2023-09-19 san_francisco93.5rider-E driver-O2023-09-22 san_francisco19.1rider-A driver-K2023-09-20 sao_paulo43.4rider-G driver-Q2023-09-22 sao_paulo34.15 rider-F driver-P2023-09-24 chennai 17.85 rider-J driver-T2023-09-19 chennai 41.06 rider-I driver-S2023-09-20 SELECT city, fare, rider, driver FROM hudi_table WHERE city NOT IN ('chennai') AND from_unixtime(ts, '-MM-dd') > '2023-09-20'; CREATE INDEX datestr ON hudi_table USING column_stats(ts) options(func='from_unixtime', format='-MM-dd'); SELECT city, fare, rider, driver FROM hudi_table WHERE from_unixtime(ts, '-MM-dd') > '2023-09-20'; san_francisco93.5rider-E driver-O sao_paulo43.4rider-G driver-Q sao_paulo34.15 rider-F driver-P Time taken: 0.428 seconds, Fetched 3 row(s) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-1814400817 Here is Full code for references ``` from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType from datetime import datetime HUDI_VERSION = '1.0.0-beta1' SPARK_VERSION = '3.4' SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ['PYSPARK_PYTHON'] = sys.executable # Spark session spark = SparkSession.builder \ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \ .config('className', 'org.apache.hudi') \ .config('spark.sql.hive.convertMetastoreParquet', 'false') \ .getOrCreate() data = [ [datetime.strptime('2023-09-20 03:58:59', '%Y-%m-%d %H:%M:%S'), '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 'driver-K', 19.10, 'san_francisco'], [datetime.strptime('2023-09-19 08:46:34', '%Y-%m-%d %H:%M:%S'), 'e96c4396-3fad-413a-a942-4cb36106d721', 'rider-C', 'driver-M', 27.70, 'san_francisco'], [datetime.strptime('2023-09-18 17:45:31', '%Y-%m-%d %H:%M:%S'), '9909a8b1-2d15-4d3d-8ec9-efc48c536a00', 'rider-D', 'driver-L', 33.90, 'san_francisco'], [datetime.strptime('2023-09-22 13:12:56', '%Y-%m-%d %H:%M:%S'), '1dced545-862b-4ceb-8b43-d2a568f6616b', 'rider-E', 'driver-O', 93.50, 'san_francisco'], [datetime.strptime('2023-09-24 06:15:45', '%Y-%m-%d %H:%M:%S'), 'e3cf430c-889d-4015-bc98-59bdce1e530c', 'rider-F', 'driver-P', 34.15, 'sao_paulo'], [datetime.strptime('2023-09-22 15:21:36', '%Y-%m-%d %H:%M:%S'), '7a84095f-737f-40bc-b62f-6b69664712d2', 'rider-G', 'driver-Q', 43.40, 'sao_paulo'], [datetime.strptime('2023-09-20 12:35:45', '%Y-%m-%d %H:%M:%S'), '3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04', 'rider-I', 'driver-S', 41.06, 'chennai'], [datetime.strptime('2023-09-19 05:34:56', '%Y-%m-%d %H:%M:%S'), 'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa', 'rider-J', 'driver-T', 17.85, 'chennai'] ] # Define schema for the DataFrame schema = StructType([ StructField("ts", TimestampType(), True), StructField("transaction_id", StringType(), True), StructField("rider", StringType(), True), StructField("driver", StringType(), True), StructField("price", FloatType(), True), StructField("location", StringType(), True), ]) # Create Spark DataFrame df = spark.createDataFrame(data, schema=schema) # Show the DataFrame df.show() db_name = "hudidb" table_name = "hudi_table_func_index" path = f"file:///Users/soumilnitinshah/Downloads/{db_name}/{table_name}" # Updated path method = 'upsert' table_type = "COPY_ON_WRITE" recordkey = "transaction_id" precombine = "ts" partition_fields = "location" hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.table.type': table_type, 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': method, 'hoodie.datasource.write.recordkey.field': recordkey, 'hoodie.datasource.write.precombine.field': precombine, "hoodie.table.metadata.enable": "true", "hoodie.datasource.write.partitionpath.field": partition_fields } df.write.format("hudi"). \ options(**hudi_options). \ mode("append"). \ save(path) # path = 'file:///Users/soumilnitinshah/Downloads/hudidb/hudi_table_func_index' # spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_table_func_index") functional_index_sql = """ CREATE INDEX IF NOT EXISTS ts_hour ON hudi_table_func_index USING column_stats(ts) """ spark.sql(functional_index_sql) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
soumilshah1995 commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-1814378726 would it work with Spark SQL on Hudi '1.0.0-beta1' or do we need to wait for '1.0.0-beta2' ? can you please provide example in pyspark how to use it I cannot seem to get it to work # Tried ``` ++++-+--+ | transaction_id| product_id|quantity|price|ts| ++++-+--+ |8ad35260-10ce-468...|977d50db-9970-41c...| 2| 24|1121221702| |2cbe47c6-c7fc-472...|1f2bceff-8056-44e...| 1| 62|1527868645| ++++-+--+ ``` # Code ``` path = 'file:///Users/soumilnitinshah/Downloads/hudidb/ecommerce_table' spark.read.format("hudi").load(path).createOrReplaceTempView("ecommerce_table") # Assuming the functional index syntax follows the proposed RFC syntax functional_index_sql = """ CREATE INDEX IF NOT EXISTS ts_functional_idx ON TABLE ecommerce_table USING column_stats(ts) OPTIONS (func='from_unixtime', format='-MM-dd') """ spark.sql(functional_index_sql) ``` # Error Logs ``` Py4JJavaError: An error occurred while calling o34.sql. : java.lang.ClassCastException: class org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias cannot be cast to class org.apache.spark.sql.catalyst.analysis.ResolvedTable (org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias and org.apache.spark.sql.catalyst.analysis.ResolvedTable are in unnamed module of loader 'app') at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFieldNameAndPosition$$anonfun$apply$58.applyOrElse(Analyzer.scala:3671) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFieldNameAndPosition$$anonfun$apply$58.applyOrElse(Analyzer.scala:3668) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFieldNameAndPosition$.apply(Analyzer.scala:3668) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFieldNameAndPosition$.apply(Analyzer.scala:3667) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211) at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:228) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:224) at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:224) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execut
Re: [I] [SUPPORT] RFC 63 Functional Index Hudi 0.1.0-beta [hudi]
codope commented on issue #10110: URL: https://github.com/apache/hudi/issues/10110#issuecomment-1813780193 Hi @soumilshah1995 , thanks for giving it a try! Currently, the `FUNCTION` keyword is not integrated. I need to update the RFC with the exact syntax which can be found here in the SQL DDL docs - https://hudi.apache.org/docs/next/sql_ddl#create-index-experimental We are tracking the issue to simplify the syntax. Ideally, we want users to be able to just say `CREATE INDEX func_index_abc on xyz_hudi_table USING column_stats(hour(ts))` without using `FUNCTION` keyword or provide extra options to specify the function. We will have it in 1.0 GA. Feel free to reach out to me directly on Hudi Slack if you're more interested in this feature. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org