Hi Yaswanth, PFA an example I prepared sometime back which can help you get started.
Thanks, Udit On 4/8/20, 3:21 PM, "Atluri Yaswanth" <yaswanth.atl...@gmail.com> wrote: CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hi Team, I would like to know are there any scripts in PySpark to upsert the data in hudi dataset. I am working with Scala now, but i want to use Pyspark as my data is not in good format(i need to use various libraries inside). Thanks in advance yaswanth
from pyspark.sql import * from pyspark.sql.functions import lit event1 = Row(event_id='100', event_date='2015-01-01', event_name='event_name_100', event_ts='2015-01-01T13:51:39.340396Z', event_type = 'type1') event2 = Row(event_id='101', event_date='2015-01-01', event_name='event_name_101', event_ts='2015-01-01T12:14:58.597216Z', event_type = 'type2') event3 = Row(event_id='102', event_date='2015-01-01', event_name='event_name_102', event_ts='2015-01-01T13:51:40.417052Z', event_type = 'type3') event4 = Row(event_id='103', event_date='2015-01-01', event_name='event_name_103', event_ts='2015-01-01T13:51:40.519832Z', event_type = 'type4') event5 = Row(event_id='104', event_date='2015-01-01', event_name='event_name_104', event_ts='2015-01-01T12:15:00.512679Z', event_type = 'type1') event6 = Row(event_id='105', event_date='2015-01-01', event_name='event_name_105', event_ts='2015-01-01T13:51:42.248818Z', event_type = 'type2') event7 = Row(event_id='106', event_date='2015-01-01', event_name='event_name_106', event_ts='2015-01-01T13:51:44.735360Z', event_type = 'type3') event8 = Row(event_id='107', event_date='2015-01-01', event_name='event_name_107', event_ts='2015-01-01T13:51:45.019544Z', event_type = 'type4') event9 = Row(event_id='108', event_date='2015-01-01', event_name='event_name_108', event_ts='2015-01-01T13:51:45.208007Z', event_type = 'type1') event10 = Row(event_id='109', event_date='2015-01-01', event_name='event_name_109', event_ts='2015-01-01T13:51:45.369689Z', event_type = 'type2') event11 = Row(event_id='110', event_date='2015-01-01', event_name='event_name_110', event_ts='2015-01-01T12:15:05.664947Z', event_type = 'type3') event12 = Row(event_id='111', event_date='2015-01-01', event_name='event_name_111', event_ts='2015-01-01T13:51:47.388239Z', event_type = 'type4') eventSeq = [event1, event2, event3, event4, event5, event6, event7, event8, event9, event10, event11, event12] df1 = spark.createDataFrame(eventSeq) tableName = "uditme_hudi_events_cow_jan10_02" tablePath = "s3://emr-users/uditme/hudi/tables/events/" + tableName df1.write.format("org.apache.hudi").option("hoodie.table.name", tableName).option("hoodie.datasource.write.operation", "insert").option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE").option("hoodie.datasource.write.recordkey.field", "event_id").option("hoodie.datasource.write.partitionpath.field", "event_type").option("hoodie.datasource.write.precombine.field", "event_ts").option("hoodie.datasource.hive_sync.enable", "true").option("hoodie.datasource.hive_sync.table", tableName).option("hoodie.datasource.hive_sync.partition_fields", "event_type").option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").mode("overwrite").save(tablePath) // Update a column in a row df2 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*") df3 = df2.filter("event_id = 104") df4 = df3.withColumn("event_name", lit("uditevent1")) df4.write.format("org.apache.hudi").option("hoodie.table.name", tableName).option("hoodie.datasource.write.operation", "upsert").option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE").option("hoodie.datasource.write.recordkey.field", "event_id").option("hoodie.datasource.write.partitionpath.field", "event_type").option("hoodie.datasource.write.precombine.field", "event_ts").option("hoodie.datasource.hive_sync.enable", "true").option("hoodie.datasource.hive_sync.table", tableName).option("hoodie.datasource.hive_sync.partition_fields", "event_type").option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").mode("append").save(tablePath) // Delete a row df2 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*") df3 = df2.filter("event_id = 104") df3.write.format("org.apache.hudi").option("hoodie.table.name", tableName).option("hoodie.datasource.write.operation", "upsert").option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE").option("hoodie.datasource.write.payload.class", "org.apache.hudi.EmptyHoodieRecordPayload").option("hoodie.datasource.write.recordkey.field", "event_id").option("hoodie.datasource.write.partitionpath.field", "event_type").option("hoodie.datasource.write.precombine.field", "event_ts").option("hoodie.datasource.hive_sync.enable", "true").option("hoodie.datasource.hive_sync.table", tableName).option("hoodie.datasource.hive_sync.partition_fields", "event_type").option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").mode("append").save(tablePath)