Hi,
I have a problem statement where I am consuming messages from Kafka and then
depending upon that Kafka message (2K records) I need to query Hudi table and
create a dataset (with both updates and inserts) and push them back to Hudi
table.
I tried following but it threw NP exception from sparkSession scala code and
rightly so as sparkSession was used in Executor.
Dataset<KafkaRecord> hudiDs = companyStatusDf.flatMap(new
FlatMapFunction<KafkaRecord, HudiRecord>() {
@Override
public Iterator<HudiRecord> call(KafkaRecord kafkaRecord) throws
Exception {
String prop1= kafkaRecord.getProp1();
String prop2= kafkaRecord.getProp2();
HudiRecord hudiRecord = sparkSession.read()
.format(HUDI_DATASOURCE)
.schema(<schema>)
.load(<path>)
.as(Encoders.bean((HudiRecord.class)))
.filter(<FILTER_ON_KAFKA_RECORD> say prop1);
hudiRecord = tranform();
// Modificiation in hudi record
return Arrays.asList(kafkaRecord, hudiRecord).iterator();
}
}
}, Encoders.bean(CompanyStatusGoldenRecord.class));
In HUDI, I have 2 level of partitions (year and month) so for eg if I get 2K
records from Kafka which will be spanned across multiple partitions - what is
advisable load first the full table like "/*/*/*" or first read kafka record,
find out which partitions need to be hit and then load only those HUDI tables
as per partitions .I believe 2nd option would be faster i.e. loading the
specific partitions and thats what I was trying in above snippet of code. So if
have to leverage partitions, is collect() on Kafka Dataset to get the list of
partitions and then supply to HUDI is the only option or I can do it just with
the spark datasets ?