ad1happy2go commented on issue #8625:
URL: https://github.com/apache/hudi/issues/8625#issuecomment-1540021140

   @chinmay-032  I just tried to reproduce with OSS version. It's very clear 
that OSS hudi doesn't contain partial update class.
   I got Class not Found error for PartialUpdateAvroPayload. Code pasted below.
   
   Now I do understand your scenario when you are saying AWS provided code have 
this class. We need to check with AWS team for that. I recommend you using OSS 
hudi 0.13.0 version which should ideally work.
   
   
   ```
   from pyspark.sql.types import *
   from pyspark.sql.functions import *
   import time
   from datetime import datetime
   from pyspark.sql import SparkSession
   from pyspark.sql import Row
   from datetime import date
   
   spark = SparkSession \
       .builder \
       .master("local[1]") \
       .config("spark.driver.memory", "8g") \
       .config("spark.jars.packages", 
"org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.2") \
       .config("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
       .config("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
       .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer") \
       .getOrCreate()
   common_config={
       "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
       "hoodie.datasource.write.recordkey.field": "id",
       "hoodie.datasource.write.precombine.field": "timestamp",
       "hoodie.datasource.write.partitionpath.field": "timestamp__date_",
       'hoodie.table.name': 'issue_8625',
       'hoodie.index.type': 'GLOBAL_BLOOM',
       'hoodie.bloom.index.update.partition.path': 'false',
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.TimestampBasedKeyGenerator",
       "hoodie.deltastreamer.keygen.timebased.timestamp.type": "SCALAR",
       "hoodie.deltastreamer.keygen.timebased.output.dateformat": 'yyyy/MM/dd',
       "hoodie.deltastreamer.keygen.timebased.timezone": "GMT",
       "hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit": 
"DAYS",
       "hoodie.datasource.write.payload.class": 
"org.apache.hudi.common.model.PartialUpdateAvroPayload",
       "hoodie.compaction.payload.class": 
"org.apache.hudi.common.model.PartialUpdateAvroPayload",
       "hoodie.datasource.compaction.async.enable": "false",
       "hoodie.payload.ordering.field": "timestamp"
   }
   
   def get_structfield(fieldname, typestring):
       if typestring == "NUMBER":
           return StructField(fieldname, DoubleType(), True)
       elif typestring == "BOOLEAN":
           return StructField(fieldname, BooleanType(), True)
       elif typestring == "RELATIVE_TIME":
           return StructField(fieldname, TimestampType(), True)
       elif typestring == "LIST_DOUBLE":
           return StructField(fieldname, ArrayType(DoubleType(), False), True)
       elif typestring == "LIST_STRING":
           return StructField(fieldname, ArrayType(StringType(), False), True)
   
       return StructField(fieldname, StringType(), True)
   
   def write_to_hudi(df):
       t1 = time.time()
       table_name = "issue_8625"
       try:
           df.write.format("org.apache.hudi") \
               .options(**common_config) \
               .option('hoodie.table.name', table_name) \
               .option('hoodie.datasource.hive_sync.table', table_name) \
               .mode("append") \
               .save("s3a://path-to-private-table/" + table_name + "/")
       except Exception as e:
           print(f"Table {table_name}: Error while writing. Job aborted 
{str(e)}")
   
       t2 = time.time()
       print(f"Table {table_name}: Writing to hudi completed in {(t2-t1):.2f} 
seconds")
       return True
   
   # Define the schema
   schema = StructType([
       StructField("id", IntegerType(), True),
       StructField("b", IntegerType(), True),
       StructField("c", StringType(), True),
       StructField("d", DateType(), True),
       StructField("timestamp", TimestampType(), True),
       StructField("f", StringType(), True)
   ])
   
   df = spark.createDataFrame([
       Row(id=1, b=2, c='string1', d=date(2000, 1, 1), 
timestamp=datetime.strptime("2000-01-01 12:00:00", '%Y-%m-%d %H:%M:%S'), f = 
"2000-01-01 12:00:00"),
       Row(id=2, b=3, c='string2', d=date(2000, 2, 1), 
timestamp=datetime.strptime("2000-01-01 12:00:00", '%Y-%m-%d %H:%M:%S'), f = 
"2000-01-01 12:21:20"),
       Row(id=4, b=5, c='string3', d=date(2000, 3, 1), 
timestamp=datetime.strptime("2000-01-01 12:00:00", '%Y-%m-%d %H:%M:%S'), f = 
"2000-01-03 12:00:00")
   ], schema)
   
   df.write.format("org.apache.hudi") \
       .options(**common_config) \
       .mode("append") \
       .save("file:///tmp/issue_8625_2")
   
   newdf = spark.createDataFrame([
       Row(id=4, b=16, c=None, d=None, timestamp=datetime.strptime("2000-01-01 
12:35:00", '%Y-%m-%d %H:%M:%S'), f=None)
   ], schema)
   newdf.show()
   
   newdf.write.format("org.apache.hudi") \
       .options(**common_config) \
       .mode("append") \
       .save("file:///tmp/issue_8625_2")
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to