Hi,
I have a problem statement where I am consuming messages from Kafka and then 
depending upon that Kafka message (2K records) I need to query Hudi table and 
create a dataset (with both updates and inserts) and push them back to Hudi 
table.

I tried following but it threw NP exception from sparkSession scala code and 
rightly so as sparkSession was used in Executor.

 Dataset<KafkaRecord> hudiDs = companyStatusDf.flatMap(new 
FlatMapFunction<KafkaRecord, HudiRecord>() {
            @Override
            public Iterator<HudiRecord> call(KafkaRecord kafkaRecord) throws 
Exception {
                String prop1= kafkaRecord.getProp1();
                String prop2= kafkaRecord.getProp2();
                HudiRecord hudiRecord =  sparkSession.read()
                        .format(HUDI_DATASOURCE)
                        .schema(<schema>)
                        .load(<path>)
                        .as(Encoders.bean((HudiRecord.class)))
                        .filter(<FILTER_ON_KAFKA_RECORD> say prop1);
                hudiRecord = tranform();
                // Modificiation in hudi record
                return Arrays.asList(kafkaRecord, hudiRecord).iterator();
                }

            }
        }, Encoders.bean(CompanyStatusGoldenRecord.class));

In HUDI, I have 2 level of partitions (year and month) so for eg if I get 2K 
records from Kafka which will be spanned across multiple partitions - what is 
advisable load first the full table like "/*/*/*" or first read kafka record, 
find out which partitions need to be hit and then load only those HUDI tables 
as per partitions .I believe 2nd option would be faster i.e. loading the 
specific partitions and thats what I was trying in above snippet of code. So if 
have to leverage partitions, is collect() on Kafka Dataset to get the list of 
partitions  and then supply to HUDI is the only option or I can do it just with 
the spark datasets ?

Reply via email to