Hi Yaswanth,

PFA an example I prepared sometime back which can help you get started.

Thanks,
Udit

On 4/8/20, 3:21 PM, "Atluri Yaswanth" <yaswanth.atl...@gmail.com> wrote:

    CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.
    
    
    
    Hi Team,
    
    I would like to know are there any scripts in PySpark to upsert the data in
    hudi dataset.
    
    I am working with Scala now, but i want to use Pyspark as my data is not in
    good format(i need to use various libraries inside).
    
    Thanks in advance
    yaswanth
    

from pyspark.sql import *
from pyspark.sql.functions import lit

event1 = Row(event_id='100', event_date='2015-01-01', event_name='event_name_100', event_ts='2015-01-01T13:51:39.340396Z', event_type = 'type1')
event2 = Row(event_id='101', event_date='2015-01-01', event_name='event_name_101', event_ts='2015-01-01T12:14:58.597216Z', event_type = 'type2')
event3 = Row(event_id='102', event_date='2015-01-01', event_name='event_name_102', event_ts='2015-01-01T13:51:40.417052Z', event_type = 'type3')
event4 = Row(event_id='103', event_date='2015-01-01', event_name='event_name_103', event_ts='2015-01-01T13:51:40.519832Z', event_type = 'type4')
event5 = Row(event_id='104', event_date='2015-01-01', event_name='event_name_104', event_ts='2015-01-01T12:15:00.512679Z', event_type = 'type1')
event6 = Row(event_id='105', event_date='2015-01-01', event_name='event_name_105', event_ts='2015-01-01T13:51:42.248818Z', event_type = 'type2')
event7 = Row(event_id='106', event_date='2015-01-01', event_name='event_name_106', event_ts='2015-01-01T13:51:44.735360Z', event_type = 'type3')
event8 = Row(event_id='107', event_date='2015-01-01', event_name='event_name_107', event_ts='2015-01-01T13:51:45.019544Z', event_type = 'type4')
event9 = Row(event_id='108', event_date='2015-01-01', event_name='event_name_108', event_ts='2015-01-01T13:51:45.208007Z', event_type = 'type1')
event10 = Row(event_id='109', event_date='2015-01-01', event_name='event_name_109', event_ts='2015-01-01T13:51:45.369689Z', event_type = 'type2')
event11 = Row(event_id='110', event_date='2015-01-01', event_name='event_name_110', event_ts='2015-01-01T12:15:05.664947Z', event_type = 'type3')
event12 = Row(event_id='111', event_date='2015-01-01', event_name='event_name_111', event_ts='2015-01-01T13:51:47.388239Z', event_type = 'type4')

eventSeq = [event1, event2, event3, event4, event5, event6, event7, event8, event9, event10, event11, event12]
df1 = spark.createDataFrame(eventSeq)

tableName = "uditme_hudi_events_cow_jan10_02"
tablePath = "s3://emr-users/uditme/hudi/tables/events/" + tableName
df1.write.format("org.apache.hudi").option("hoodie.table.name", tableName).option("hoodie.datasource.write.operation", "insert").option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE").option("hoodie.datasource.write.recordkey.field", "event_id").option("hoodie.datasource.write.partitionpath.field", "event_type").option("hoodie.datasource.write.precombine.field", "event_ts").option("hoodie.datasource.hive_sync.enable", "true").option("hoodie.datasource.hive_sync.table", tableName).option("hoodie.datasource.hive_sync.partition_fields", "event_type").option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").mode("overwrite").save(tablePath)

// Update a column in a row
df2 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
df3 = df2.filter("event_id = 104")
df4 = df3.withColumn("event_name", lit("uditevent1"))
df4.write.format("org.apache.hudi").option("hoodie.table.name", tableName).option("hoodie.datasource.write.operation", "upsert").option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE").option("hoodie.datasource.write.recordkey.field", "event_id").option("hoodie.datasource.write.partitionpath.field", "event_type").option("hoodie.datasource.write.precombine.field", "event_ts").option("hoodie.datasource.hive_sync.enable", "true").option("hoodie.datasource.hive_sync.table", tableName).option("hoodie.datasource.hive_sync.partition_fields", "event_type").option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").mode("append").save(tablePath)

// Delete a row
df2 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
df3 = df2.filter("event_id = 104")
df3.write.format("org.apache.hudi").option("hoodie.table.name", tableName).option("hoodie.datasource.write.operation", "upsert").option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE").option("hoodie.datasource.write.payload.class", "org.apache.hudi.EmptyHoodieRecordPayload").option("hoodie.datasource.write.recordkey.field", "event_id").option("hoodie.datasource.write.partitionpath.field", "event_type").option("hoodie.datasource.write.precombine.field", "event_ts").option("hoodie.datasource.hive_sync.enable", "true").option("hoodie.datasource.hive_sync.table", tableName).option("hoodie.datasource.hive_sync.partition_fields", "event_type").option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").mode("append").save(tablePath)

Reply via email to