Hi Priya,

PFA an example I prepared sometime back which can help you get started.

Thanks,
Udit


On 4/8/20, 3:44 PM, "Priyadarshini Shivram" 
<[email protected]> wrote:

    CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.
    
    
    
    Hi,
    
    We are currently looking to implement hudi for one of our projects that 
would need record level inserts and updates and we are specifically looking for 
examples in python.
    
    I did check in the project mentioned in this mailing group 
http://mail-archives.apache.org/mod_mbox/hudi-dev/201909.mbox/%3ccakw-+5riybltr_prhsjz0uxg31c8oy_yjra3ycsjttjxvma...@mail.gmail.com%3E,
 but all of the examples provided are only for scala.
    
    Thanks
    Priya
    

from pyspark.sql import *
from pyspark.sql.functions import lit

event1 = Row(event_id='100', event_date='2015-01-01', event_name='event_name_100', event_ts='2015-01-01T13:51:39.340396Z', event_type = 'type1')
event2 = Row(event_id='101', event_date='2015-01-01', event_name='event_name_101', event_ts='2015-01-01T12:14:58.597216Z', event_type = 'type2')
event3 = Row(event_id='102', event_date='2015-01-01', event_name='event_name_102', event_ts='2015-01-01T13:51:40.417052Z', event_type = 'type3')
event4 = Row(event_id='103', event_date='2015-01-01', event_name='event_name_103', event_ts='2015-01-01T13:51:40.519832Z', event_type = 'type4')
event5 = Row(event_id='104', event_date='2015-01-01', event_name='event_name_104', event_ts='2015-01-01T12:15:00.512679Z', event_type = 'type1')
event6 = Row(event_id='105', event_date='2015-01-01', event_name='event_name_105', event_ts='2015-01-01T13:51:42.248818Z', event_type = 'type2')
event7 = Row(event_id='106', event_date='2015-01-01', event_name='event_name_106', event_ts='2015-01-01T13:51:44.735360Z', event_type = 'type3')
event8 = Row(event_id='107', event_date='2015-01-01', event_name='event_name_107', event_ts='2015-01-01T13:51:45.019544Z', event_type = 'type4')
event9 = Row(event_id='108', event_date='2015-01-01', event_name='event_name_108', event_ts='2015-01-01T13:51:45.208007Z', event_type = 'type1')
event10 = Row(event_id='109', event_date='2015-01-01', event_name='event_name_109', event_ts='2015-01-01T13:51:45.369689Z', event_type = 'type2')
event11 = Row(event_id='110', event_date='2015-01-01', event_name='event_name_110', event_ts='2015-01-01T12:15:05.664947Z', event_type = 'type3')
event12 = Row(event_id='111', event_date='2015-01-01', event_name='event_name_111', event_ts='2015-01-01T13:51:47.388239Z', event_type = 'type4')

eventSeq = [event1, event2, event3, event4, event5, event6, event7, event8, event9, event10, event11, event12]
df1 = spark.createDataFrame(eventSeq)

tableName = "uditme_hudi_events_cow_jan10_02"
tablePath = "s3://emr-users/uditme/hudi/tables/events/" + tableName
df1.write.format("org.apache.hudi").option("hoodie.table.name", tableName).option("hoodie.datasource.write.operation", "insert").option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE").option("hoodie.datasource.write.recordkey.field", "event_id").option("hoodie.datasource.write.partitionpath.field", "event_type").option("hoodie.datasource.write.precombine.field", "event_ts").option("hoodie.datasource.hive_sync.enable", "true").option("hoodie.datasource.hive_sync.table", tableName).option("hoodie.datasource.hive_sync.partition_fields", "event_type").option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").mode("overwrite").save(tablePath)

// Update a column in a row
df2 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
df3 = df2.filter("event_id = 104")
df4 = df3.withColumn("event_name", lit("uditevent1"))
df4.write.format("org.apache.hudi").option("hoodie.table.name", tableName).option("hoodie.datasource.write.operation", "upsert").option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE").option("hoodie.datasource.write.recordkey.field", "event_id").option("hoodie.datasource.write.partitionpath.field", "event_type").option("hoodie.datasource.write.precombine.field", "event_ts").option("hoodie.datasource.hive_sync.enable", "true").option("hoodie.datasource.hive_sync.table", tableName).option("hoodie.datasource.hive_sync.partition_fields", "event_type").option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").mode("append").save(tablePath)

// Delete a row
df2 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
df3 = df2.filter("event_id = 104")
df3.write.format("org.apache.hudi").option("hoodie.table.name", tableName).option("hoodie.datasource.write.operation", "upsert").option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE").option("hoodie.datasource.write.payload.class", "org.apache.hudi.EmptyHoodieRecordPayload").option("hoodie.datasource.write.recordkey.field", "event_id").option("hoodie.datasource.write.partitionpath.field", "event_type").option("hoodie.datasource.write.precombine.field", "event_ts").option("hoodie.datasource.hive_sync.enable", "true").option("hoodie.datasource.hive_sync.table", tableName).option("hoodie.datasource.hive_sync.partition_fields", "event_type").option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").mode("append").save(tablePath)

Reply via email to