[ 
https://issues.apache.org/jira/browse/HUDI-7360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raymond Xu updated HUDI-7360:
-----------------------------
    Component/s: incremental-query

> Incremental CDC Query after 0.14.1 upgrade giving Jackson class 
> incompatibility exception
> -----------------------------------------------------------------------------------------
>
>                 Key: HUDI-7360
>                 URL: https://issues.apache.org/jira/browse/HUDI-7360
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: incremental-query, reader-core
>            Reporter: Aditya Goenka
>            Priority: Critical
>             Fix For: 1.1.0
>
>
> Github Issue - [https://github.com/apache/hudi/issues/10590]
> Reproducible code 
> ```
> from typing import Any
> from pyspark import Row
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import col
> spark = SparkSession.builder \
> .appName("Hudi Basics") \
> .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
> .config("spark.jars.packages", 
> "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1") \
> .config("spark.sql.extensions", 
> "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
> .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
> .getOrCreate()
> sc = spark.sparkContext
> table_name = "hudi_trips_cdc"
> base_path = "/tmp/test_issue_10590_4" # Replace for whatever path
> quickstart_utils = sc._jvm.org.apache.hudi.QuickstartUtils
> dataGen = quickstart_utils.DataGenerator()
> inserts = 
> sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
> def create_df():
> df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
> return df
> def write_data():
> df = create_df()
> hudi_options = {
> "hoodie.table.name": table_name,
> "hoodie.datasource.write.recordkey.field": "uuid",
> "hoodie.datasource.write.table.type": "MERGE_ON_READ", # This can be either 
> MoR or CoW and the error will still happen
> "hoodie.datasource.write.partitionpath.field": "partitionpath",
> "hoodie.datasource.write.table.name": table_name,
> "hoodie.datasource.write.operation": "upsert",
> "hoodie.table.cdc.enabled": "true", # This can be left enabled, and won"t 
> affect anything unless actually queried as CDC
> "hoodie.datasource.write.precombine.field": "ts",
> "hoodie.upsert.shuffle.parallelism": 2,
> "hoodie.insert.shuffle.parallelism": 2
> }
> df.write.format("hudi") \
> .options(**hudi_options) \
> .mode("overwrite") \
> .save(base_path)
> def update_data():
> updates = quickstart_utils.convertToStringList(dataGen.generateUpdates(10))
> df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
> df.write \
> .format("hudi") \
> .mode("append") \
> .save(base_path)
> def incremental_query():
> ordered_rows: list[Row] = spark.read \
> .format("hudi") \
> .load(base_path) \
> .select(col("_hoodie_commit_time").alias("commit_time")) \
> .orderBy(col("commit_time")) \
> .collect()
> commits: list[Any] = list(map(lambda row: row[0], ordered_rows))
> begin_time = commits[0]
> incremental_read_options = {
> 'hoodie.datasource.query.incremental.format': "cdc", # Uncomment this line to 
> Query as CDC, crashes in 0.14.1
> 'hoodie.datasource.query.type': 'incremental',
> 'hoodie.datasource.read.begin.instanttime': begin_time,
> }
> trips_incremental_df = spark.read \
> .format("hudi") \
> .options(**incremental_read_options) \
> .load(base_path)
> # Error also occurs when using the "from_hudi_table_changes" in 0.14.1
> # sql_query = f""" SELECT * FROM hudi_table_changes ('\{base_path}', 'cdc', 
> 'earliest')"""
> # trips_incremental_df = spark.sql(sql_query)
> trips_incremental_df.show()
> trips_incremental_df.printSchema()
> if __name__ == "__main__":
> write_data()
> update_data()
> incremental_query()
> ```
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to