[ https://issues.apache.org/jira/browse/HUDI-7360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Raymond Xu updated HUDI-7360: ----------------------------- Component/s: incremental-query > Incremental CDC Query after 0.14.1 upgrade giving Jackson class > incompatibility exception > ----------------------------------------------------------------------------------------- > > Key: HUDI-7360 > URL: https://issues.apache.org/jira/browse/HUDI-7360 > Project: Apache Hudi > Issue Type: Bug > Components: incremental-query, reader-core > Reporter: Aditya Goenka > Priority: Critical > Fix For: 1.1.0 > > > Github Issue - [https://github.com/apache/hudi/issues/10590] > Reproducible code > ``` > from typing import Any > from pyspark import Row > from pyspark.sql import SparkSession > from pyspark.sql.functions import col > spark = SparkSession.builder \ > .appName("Hudi Basics") \ > .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ > .config("spark.jars.packages", > "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1") \ > .config("spark.sql.extensions", > "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \ > .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \ > .getOrCreate() > sc = spark.sparkContext > table_name = "hudi_trips_cdc" > base_path = "/tmp/test_issue_10590_4" # Replace for whatever path > quickstart_utils = sc._jvm.org.apache.hudi.QuickstartUtils > dataGen = quickstart_utils.DataGenerator() > inserts = > sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10)) > def create_df(): > df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) > return df > def write_data(): > df = create_df() > hudi_options = { > "hoodie.table.name": table_name, > "hoodie.datasource.write.recordkey.field": "uuid", > "hoodie.datasource.write.table.type": "MERGE_ON_READ", # This can be either > MoR or CoW and the error will still happen > "hoodie.datasource.write.partitionpath.field": "partitionpath", > "hoodie.datasource.write.table.name": table_name, > "hoodie.datasource.write.operation": "upsert", > "hoodie.table.cdc.enabled": "true", # This can be left enabled, and won"t > affect anything unless actually queried as CDC > "hoodie.datasource.write.precombine.field": "ts", > "hoodie.upsert.shuffle.parallelism": 2, > "hoodie.insert.shuffle.parallelism": 2 > } > df.write.format("hudi") \ > .options(**hudi_options) \ > .mode("overwrite") \ > .save(base_path) > def update_data(): > updates = quickstart_utils.convertToStringList(dataGen.generateUpdates(10)) > df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) > df.write \ > .format("hudi") \ > .mode("append") \ > .save(base_path) > def incremental_query(): > ordered_rows: list[Row] = spark.read \ > .format("hudi") \ > .load(base_path) \ > .select(col("_hoodie_commit_time").alias("commit_time")) \ > .orderBy(col("commit_time")) \ > .collect() > commits: list[Any] = list(map(lambda row: row[0], ordered_rows)) > begin_time = commits[0] > incremental_read_options = { > 'hoodie.datasource.query.incremental.format': "cdc", # Uncomment this line to > Query as CDC, crashes in 0.14.1 > 'hoodie.datasource.query.type': 'incremental', > 'hoodie.datasource.read.begin.instanttime': begin_time, > } > trips_incremental_df = spark.read \ > .format("hudi") \ > .options(**incremental_read_options) \ > .load(base_path) > # Error also occurs when using the "from_hudi_table_changes" in 0.14.1 > # sql_query = f""" SELECT * FROM hudi_table_changes ('\{base_path}', 'cdc', > 'earliest')""" > # trips_incremental_df = spark.sql(sql_query) > trips_incremental_df.show() > trips_incremental_df.printSchema() > if __name__ == "__main__": > write_data() > update_data() > incremental_query() > ``` > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)