Hi Priya,
PFA an example I prepared sometime back which can help you get started.
Thanks,
Udit
On 4/8/20, 3:44 PM, "Priyadarshini Shivram"
<[email protected]> wrote:
CAUTION: This email originated from outside of the organization. Do not
click links or open attachments unless you can confirm the sender and know the
content is safe.
Hi,
We are currently looking to implement hudi for one of our projects that
would need record level inserts and updates and we are specifically looking for
examples in python.
I did check in the project mentioned in this mailing group
http://mail-archives.apache.org/mod_mbox/hudi-dev/201909.mbox/%3ccakw-+5riybltr_prhsjz0uxg31c8oy_yjra3ycsjttjxvma...@mail.gmail.com%3E,
but all of the examples provided are only for scala.
Thanks
Priya
from pyspark.sql import *
from pyspark.sql.functions import lit
event1 = Row(event_id='100', event_date='2015-01-01', event_name='event_name_100', event_ts='2015-01-01T13:51:39.340396Z', event_type = 'type1')
event2 = Row(event_id='101', event_date='2015-01-01', event_name='event_name_101', event_ts='2015-01-01T12:14:58.597216Z', event_type = 'type2')
event3 = Row(event_id='102', event_date='2015-01-01', event_name='event_name_102', event_ts='2015-01-01T13:51:40.417052Z', event_type = 'type3')
event4 = Row(event_id='103', event_date='2015-01-01', event_name='event_name_103', event_ts='2015-01-01T13:51:40.519832Z', event_type = 'type4')
event5 = Row(event_id='104', event_date='2015-01-01', event_name='event_name_104', event_ts='2015-01-01T12:15:00.512679Z', event_type = 'type1')
event6 = Row(event_id='105', event_date='2015-01-01', event_name='event_name_105', event_ts='2015-01-01T13:51:42.248818Z', event_type = 'type2')
event7 = Row(event_id='106', event_date='2015-01-01', event_name='event_name_106', event_ts='2015-01-01T13:51:44.735360Z', event_type = 'type3')
event8 = Row(event_id='107', event_date='2015-01-01', event_name='event_name_107', event_ts='2015-01-01T13:51:45.019544Z', event_type = 'type4')
event9 = Row(event_id='108', event_date='2015-01-01', event_name='event_name_108', event_ts='2015-01-01T13:51:45.208007Z', event_type = 'type1')
event10 = Row(event_id='109', event_date='2015-01-01', event_name='event_name_109', event_ts='2015-01-01T13:51:45.369689Z', event_type = 'type2')
event11 = Row(event_id='110', event_date='2015-01-01', event_name='event_name_110', event_ts='2015-01-01T12:15:05.664947Z', event_type = 'type3')
event12 = Row(event_id='111', event_date='2015-01-01', event_name='event_name_111', event_ts='2015-01-01T13:51:47.388239Z', event_type = 'type4')
eventSeq = [event1, event2, event3, event4, event5, event6, event7, event8, event9, event10, event11, event12]
df1 = spark.createDataFrame(eventSeq)
tableName = "uditme_hudi_events_cow_jan10_02"
tablePath = "s3://emr-users/uditme/hudi/tables/events/" + tableName
df1.write.format("org.apache.hudi").option("hoodie.table.name", tableName).option("hoodie.datasource.write.operation", "insert").option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE").option("hoodie.datasource.write.recordkey.field", "event_id").option("hoodie.datasource.write.partitionpath.field", "event_type").option("hoodie.datasource.write.precombine.field", "event_ts").option("hoodie.datasource.hive_sync.enable", "true").option("hoodie.datasource.hive_sync.table", tableName).option("hoodie.datasource.hive_sync.partition_fields", "event_type").option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").mode("overwrite").save(tablePath)
// Update a column in a row
df2 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
df3 = df2.filter("event_id = 104")
df4 = df3.withColumn("event_name", lit("uditevent1"))
df4.write.format("org.apache.hudi").option("hoodie.table.name", tableName).option("hoodie.datasource.write.operation", "upsert").option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE").option("hoodie.datasource.write.recordkey.field", "event_id").option("hoodie.datasource.write.partitionpath.field", "event_type").option("hoodie.datasource.write.precombine.field", "event_ts").option("hoodie.datasource.hive_sync.enable", "true").option("hoodie.datasource.hive_sync.table", tableName).option("hoodie.datasource.hive_sync.partition_fields", "event_type").option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").mode("append").save(tablePath)
// Delete a row
df2 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
df3 = df2.filter("event_id = 104")
df3.write.format("org.apache.hudi").option("hoodie.table.name", tableName).option("hoodie.datasource.write.operation", "upsert").option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE").option("hoodie.datasource.write.payload.class", "org.apache.hudi.EmptyHoodieRecordPayload").option("hoodie.datasource.write.recordkey.field", "event_id").option("hoodie.datasource.write.partitionpath.field", "event_type").option("hoodie.datasource.write.precombine.field", "event_ts").option("hoodie.datasource.hive_sync.enable", "true").option("hoodie.datasource.hive_sync.table", tableName).option("hoodie.datasource.hive_sync.partition_fields", "event_type").option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").mode("append").save(tablePath)