ad1happy2go commented on issue #8625: URL: https://github.com/apache/hudi/issues/8625#issuecomment-1540021140
@chinmay-032 I just tried to reproduce with OSS version. It's very clear that OSS hudi doesn't contain partial update class. I got Class not Found error for PartialUpdateAvroPayload. Code pasted below. Now I do understand your scenario when you are saying AWS provided code have this class. We need to check with AWS team for that. I recommend you using OSS hudi 0.13.0 version which should ideally work. ``` from pyspark.sql.types import * from pyspark.sql.functions import * import time from datetime import datetime from pyspark.sql import SparkSession from pyspark.sql import Row from datetime import date spark = SparkSession \ .builder \ .master("local[1]") \ .config("spark.driver.memory", "8g") \ .config("spark.jars.packages", "org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.2") \ .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ .getOrCreate() common_config={ "hoodie.datasource.write.table.type": "COPY_ON_WRITE", "hoodie.datasource.write.recordkey.field": "id", "hoodie.datasource.write.precombine.field": "timestamp", "hoodie.datasource.write.partitionpath.field": "timestamp__date_", 'hoodie.table.name': 'issue_8625', 'hoodie.index.type': 'GLOBAL_BLOOM', 'hoodie.bloom.index.update.partition.path': 'false', "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.TimestampBasedKeyGenerator", "hoodie.deltastreamer.keygen.timebased.timestamp.type": "SCALAR", "hoodie.deltastreamer.keygen.timebased.output.dateformat": 'yyyy/MM/dd', "hoodie.deltastreamer.keygen.timebased.timezone": "GMT", "hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit": "DAYS", "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.PartialUpdateAvroPayload", "hoodie.compaction.payload.class": "org.apache.hudi.common.model.PartialUpdateAvroPayload", "hoodie.datasource.compaction.async.enable": "false", "hoodie.payload.ordering.field": "timestamp" } def get_structfield(fieldname, typestring): if typestring == "NUMBER": return StructField(fieldname, DoubleType(), True) elif typestring == "BOOLEAN": return StructField(fieldname, BooleanType(), True) elif typestring == "RELATIVE_TIME": return StructField(fieldname, TimestampType(), True) elif typestring == "LIST_DOUBLE": return StructField(fieldname, ArrayType(DoubleType(), False), True) elif typestring == "LIST_STRING": return StructField(fieldname, ArrayType(StringType(), False), True) return StructField(fieldname, StringType(), True) def write_to_hudi(df): t1 = time.time() table_name = "issue_8625" try: df.write.format("org.apache.hudi") \ .options(**common_config) \ .option('hoodie.table.name', table_name) \ .option('hoodie.datasource.hive_sync.table', table_name) \ .mode("append") \ .save("s3a://path-to-private-table/" + table_name + "/") except Exception as e: print(f"Table {table_name}: Error while writing. Job aborted {str(e)}") t2 = time.time() print(f"Table {table_name}: Writing to hudi completed in {(t2-t1):.2f} seconds") return True # Define the schema schema = StructType([ StructField("id", IntegerType(), True), StructField("b", IntegerType(), True), StructField("c", StringType(), True), StructField("d", DateType(), True), StructField("timestamp", TimestampType(), True), StructField("f", StringType(), True) ]) df = spark.createDataFrame([ Row(id=1, b=2, c='string1', d=date(2000, 1, 1), timestamp=datetime.strptime("2000-01-01 12:00:00", '%Y-%m-%d %H:%M:%S'), f = "2000-01-01 12:00:00"), Row(id=2, b=3, c='string2', d=date(2000, 2, 1), timestamp=datetime.strptime("2000-01-01 12:00:00", '%Y-%m-%d %H:%M:%S'), f = "2000-01-01 12:21:20"), Row(id=4, b=5, c='string3', d=date(2000, 3, 1), timestamp=datetime.strptime("2000-01-01 12:00:00", '%Y-%m-%d %H:%M:%S'), f = "2000-01-03 12:00:00") ], schema) df.write.format("org.apache.hudi") \ .options(**common_config) \ .mode("append") \ .save("file:///tmp/issue_8625_2") newdf = spark.createDataFrame([ Row(id=4, b=16, c=None, d=None, timestamp=datetime.strptime("2000-01-01 12:35:00", '%Y-%m-%d %H:%M:%S'), f=None) ], schema) newdf.show() newdf.write.format("org.apache.hudi") \ .options(**common_config) \ .mode("append") \ .save("file:///tmp/issue_8625_2") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org